drbd: protect all idr accesses that might sleep with drbd_cfg_rwsem
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         DEFINE_WAIT(wait);
248
249         /* Yes, we may run up to @number over max_buffers. If we
250          * follow it strictly, the admin will get it wrong anyways. */
251         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
252                 page = __drbd_alloc_pages(mdev, number);
253
254         while (page == NULL) {
255                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
256
257                 drbd_kick_lo_and_reclaim_net(mdev);
258
259                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
260                         page = __drbd_alloc_pages(mdev, number);
261                         if (page)
262                                 break;
263                 }
264
265                 if (!retry)
266                         break;
267
268                 if (signal_pending(current)) {
269                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
270                         break;
271                 }
272
273                 schedule();
274         }
275         finish_wait(&drbd_pp_wait, &wait);
276
277         if (page)
278                 atomic_add(number, &mdev->pp_in_use);
279         return page;
280 }
281
282 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
283  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
284  * Either links the page chain back to the global pool,
285  * or returns all pages to the system. */
286 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
287 {
288         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
289         int i;
290
291         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
292                 i = page_chain_free(page);
293         else {
294                 struct page *tmp;
295                 tmp = page_chain_tail(page, &i);
296                 spin_lock(&drbd_pp_lock);
297                 page_chain_add(&drbd_pp_pool, page, tmp);
298                 drbd_pp_vacant += i;
299                 spin_unlock(&drbd_pp_lock);
300         }
301         i = atomic_sub_return(i, a);
302         if (i < 0)
303                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
304                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
305         wake_up(&drbd_pp_wait);
306 }
307
308 /*
309 You need to hold the req_lock:
310  _drbd_wait_ee_list_empty()
311
312 You must not have the req_lock:
313  drbd_free_peer_req()
314  drbd_alloc_peer_req()
315  drbd_free_peer_reqs()
316  drbd_ee_fix_bhs()
317  drbd_finish_peer_reqs()
318  drbd_clear_done_ee()
319  drbd_wait_ee_list_empty()
320 */
321
322 struct drbd_peer_request *
323 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
324                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
325 {
326         struct drbd_peer_request *peer_req;
327         struct page *page;
328         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
329
330         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
331                 return NULL;
332
333         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
334         if (!peer_req) {
335                 if (!(gfp_mask & __GFP_NOWARN))
336                         dev_err(DEV, "%s: allocation failed\n", __func__);
337                 return NULL;
338         }
339
340         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
341         if (!page)
342                 goto fail;
343
344         drbd_clear_interval(&peer_req->i);
345         peer_req->i.size = data_size;
346         peer_req->i.sector = sector;
347         peer_req->i.local = false;
348         peer_req->i.waiting = false;
349
350         peer_req->epoch = NULL;
351         peer_req->w.mdev = mdev;
352         peer_req->pages = page;
353         atomic_set(&peer_req->pending_bios, 0);
354         peer_req->flags = 0;
355         /*
356          * The block_id is opaque to the receiver.  It is not endianness
357          * converted, and sent back to the sender unchanged.
358          */
359         peer_req->block_id = id;
360
361         return peer_req;
362
363  fail:
364         mempool_free(peer_req, drbd_ee_mempool);
365         return NULL;
366 }
367
368 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
369                        int is_net)
370 {
371         if (peer_req->flags & EE_HAS_DIGEST)
372                 kfree(peer_req->digest);
373         drbd_free_pages(mdev, peer_req->pages, is_net);
374         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
375         D_ASSERT(drbd_interval_empty(&peer_req->i));
376         mempool_free(peer_req, drbd_ee_mempool);
377 }
378
379 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
380 {
381         LIST_HEAD(work_list);
382         struct drbd_peer_request *peer_req, *t;
383         int count = 0;
384         int is_net = list == &mdev->net_ee;
385
386         spin_lock_irq(&mdev->tconn->req_lock);
387         list_splice_init(list, &work_list);
388         spin_unlock_irq(&mdev->tconn->req_lock);
389
390         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
391                 __drbd_free_peer_req(mdev, peer_req, is_net);
392                 count++;
393         }
394         return count;
395 }
396
397 /*
398  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
399  */
400 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
401 {
402         LIST_HEAD(work_list);
403         LIST_HEAD(reclaimed);
404         struct drbd_peer_request *peer_req, *t;
405         int err = 0;
406
407         spin_lock_irq(&mdev->tconn->req_lock);
408         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
409         list_splice_init(&mdev->done_ee, &work_list);
410         spin_unlock_irq(&mdev->tconn->req_lock);
411
412         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
413                 drbd_free_net_peer_req(mdev, peer_req);
414
415         /* possible callbacks here:
416          * e_end_block, and e_end_resync_block, e_send_discard_write.
417          * all ignore the last argument.
418          */
419         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
420                 int err2;
421
422                 /* list_del not necessary, next/prev members not touched */
423                 err2 = peer_req->w.cb(&peer_req->w, !!err);
424                 if (!err)
425                         err = err2;
426                 drbd_free_peer_req(mdev, peer_req);
427         }
428         wake_up(&mdev->ee_wait);
429
430         return err;
431 }
432
433 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
434                                      struct list_head *head)
435 {
436         DEFINE_WAIT(wait);
437
438         /* avoids spin_lock/unlock
439          * and calling prepare_to_wait in the fast path */
440         while (!list_empty(head)) {
441                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
442                 spin_unlock_irq(&mdev->tconn->req_lock);
443                 io_schedule();
444                 finish_wait(&mdev->ee_wait, &wait);
445                 spin_lock_irq(&mdev->tconn->req_lock);
446         }
447 }
448
449 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
450                                     struct list_head *head)
451 {
452         spin_lock_irq(&mdev->tconn->req_lock);
453         _drbd_wait_ee_list_empty(mdev, head);
454         spin_unlock_irq(&mdev->tconn->req_lock);
455 }
456
457 /* see also kernel_accept; which is only present since 2.6.18.
458  * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
460 {
461         struct sock *sk = sock->sk;
462         int err = 0;
463
464         *what = "listen";
465         err = sock->ops->listen(sock, 5);
466         if (err < 0)
467                 goto out;
468
469         *what = "sock_create_lite";
470         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
471                                newsock);
472         if (err < 0)
473                 goto out;
474
475         *what = "accept";
476         err = sock->ops->accept(sock, *newsock, 0);
477         if (err < 0) {
478                 sock_release(*newsock);
479                 *newsock = NULL;
480                 goto out;
481         }
482         (*newsock)->ops  = sock->ops;
483
484 out:
485         return err;
486 }
487
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
489 {
490         mm_segment_t oldfs;
491         struct kvec iov = {
492                 .iov_base = buf,
493                 .iov_len = size,
494         };
495         struct msghdr msg = {
496                 .msg_iovlen = 1,
497                 .msg_iov = (struct iovec *)&iov,
498                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
499         };
500         int rv;
501
502         oldfs = get_fs();
503         set_fs(KERNEL_DS);
504         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
505         set_fs(oldfs);
506
507         return rv;
508 }
509
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
511 {
512         mm_segment_t oldfs;
513         struct kvec iov = {
514                 .iov_base = buf,
515                 .iov_len = size,
516         };
517         struct msghdr msg = {
518                 .msg_iovlen = 1,
519                 .msg_iov = (struct iovec *)&iov,
520                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
521         };
522         int rv;
523
524         oldfs = get_fs();
525         set_fs(KERNEL_DS);
526
527         for (;;) {
528                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
529                 if (rv == size)
530                         break;
531
532                 /* Note:
533                  * ECONNRESET   other side closed the connection
534                  * ERESTARTSYS  (on  sock) we got a signal
535                  */
536
537                 if (rv < 0) {
538                         if (rv == -ECONNRESET)
539                                 conn_info(tconn, "sock was reset by peer\n");
540                         else if (rv != -ERESTARTSYS)
541                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
542                         break;
543                 } else if (rv == 0) {
544                         conn_info(tconn, "sock was shut down by peer\n");
545                         break;
546                 } else  {
547                         /* signal came in, or peer/link went down,
548                          * after we read a partial message
549                          */
550                         /* D_ASSERT(signal_pending(current)); */
551                         break;
552                 }
553         };
554
555         set_fs(oldfs);
556
557         if (rv != size)
558                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
559
560         return rv;
561 }
562
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
564 {
565         int err;
566
567         err = drbd_recv(tconn, buf, size);
568         if (err != size) {
569                 if (err >= 0)
570                         err = -EIO;
571         } else
572                 err = 0;
573         return err;
574 }
575
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
577 {
578         int err;
579
580         err = drbd_recv_all(tconn, buf, size);
581         if (err && !signal_pending(current))
582                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
583         return err;
584 }
585
586 /* quoting tcp(7):
587  *   On individual connections, the socket buffer size must be set prior to the
588  *   listen(2) or connect(2) calls in order to have it take effect.
589  * This is our wrapper to do so.
590  */
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
592                 unsigned int rcv)
593 {
594         /* open coded SO_SNDBUF, SO_RCVBUF */
595         if (snd) {
596                 sock->sk->sk_sndbuf = snd;
597                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
598         }
599         if (rcv) {
600                 sock->sk->sk_rcvbuf = rcv;
601                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
602         }
603 }
604
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
606 {
607         const char *what;
608         struct socket *sock;
609         struct sockaddr_in6 src_in6;
610         int err;
611         int disconnect_on_error = 1;
612
613         if (!get_net_conf(tconn))
614                 return NULL;
615
616         what = "sock_create_kern";
617         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618                 SOCK_STREAM, IPPROTO_TCP, &sock);
619         if (err < 0) {
620                 sock = NULL;
621                 goto out;
622         }
623
624         sock->sk->sk_rcvtimeo =
625         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
626         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627                         tconn->net_conf->rcvbuf_size);
628
629        /* explicitly bind to the configured IP as source IP
630         *  for the outgoing connections.
631         *  This is needed for multihomed hosts and to be
632         *  able to use lo: interfaces for drbd.
633         * Make sure to use 0 as port number, so linux selects
634         *  a free one dynamically.
635         */
636         memcpy(&src_in6, tconn->net_conf->my_addr,
637                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639                 src_in6.sin6_port = 0;
640         else
641                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643         what = "bind before connect";
644         err = sock->ops->bind(sock,
645                               (struct sockaddr *) &src_in6,
646                               tconn->net_conf->my_addr_len);
647         if (err < 0)
648                 goto out;
649
650         /* connect may fail, peer not yet available.
651          * stay C_WF_CONNECTION, don't go Disconnecting! */
652         disconnect_on_error = 0;
653         what = "connect";
654         err = sock->ops->connect(sock,
655                                  (struct sockaddr *)tconn->net_conf->peer_addr,
656                                  tconn->net_conf->peer_addr_len, 0);
657
658 out:
659         if (err < 0) {
660                 if (sock) {
661                         sock_release(sock);
662                         sock = NULL;
663                 }
664                 switch (-err) {
665                         /* timeout, busy, signal pending */
666                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667                 case EINTR: case ERESTARTSYS:
668                         /* peer not (yet) available, network problem */
669                 case ECONNREFUSED: case ENETUNREACH:
670                 case EHOSTDOWN:    case EHOSTUNREACH:
671                         disconnect_on_error = 0;
672                         break;
673                 default:
674                         conn_err(tconn, "%s failed, err = %d\n", what, err);
675                 }
676                 if (disconnect_on_error)
677                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
678         }
679         put_net_conf(tconn);
680         return sock;
681 }
682
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
684 {
685         int timeo, err;
686         struct socket *s_estab = NULL, *s_listen;
687         const char *what;
688
689         if (!get_net_conf(tconn))
690                 return NULL;
691
692         what = "sock_create_kern";
693         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
695         if (err) {
696                 s_listen = NULL;
697                 goto out;
698         }
699
700         timeo = tconn->net_conf->try_connect_int * HZ;
701         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
702
703         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
704         s_listen->sk->sk_rcvtimeo = timeo;
705         s_listen->sk->sk_sndtimeo = timeo;
706         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707                         tconn->net_conf->rcvbuf_size);
708
709         what = "bind before listen";
710         err = s_listen->ops->bind(s_listen,
711                               (struct sockaddr *) tconn->net_conf->my_addr,
712                               tconn->net_conf->my_addr_len);
713         if (err < 0)
714                 goto out;
715
716         err = drbd_accept(&what, s_listen, &s_estab);
717
718 out:
719         if (s_listen)
720                 sock_release(s_listen);
721         if (err < 0) {
722                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723                         conn_err(tconn, "%s failed, err = %d\n", what, err);
724                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
725                 }
726         }
727         put_net_conf(tconn);
728
729         return s_estab;
730 }
731
732 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
733
734 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
735                              enum drbd_packet cmd)
736 {
737         if (!conn_prepare_command(tconn, sock))
738                 return -EIO;
739         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
740 }
741
742 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
743 {
744         unsigned int header_size = drbd_header_size(tconn);
745         struct packet_info pi;
746         int err;
747
748         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
749         if (err != header_size) {
750                 if (err >= 0)
751                         err = -EIO;
752                 return err;
753         }
754         err = decode_header(tconn, tconn->data.rbuf, &pi);
755         if (err)
756                 return err;
757         return pi.cmd;
758 }
759
760 /**
761  * drbd_socket_okay() - Free the socket if its connection is not okay
762  * @sock:       pointer to the pointer to the socket.
763  */
764 static int drbd_socket_okay(struct socket **sock)
765 {
766         int rr;
767         char tb[4];
768
769         if (!*sock)
770                 return false;
771
772         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
773
774         if (rr > 0 || rr == -EAGAIN) {
775                 return true;
776         } else {
777                 sock_release(*sock);
778                 *sock = NULL;
779                 return false;
780         }
781 }
782 /* Gets called if a connection is established, or if a new minor gets created
783    in a connection */
784 int drbd_connected(int vnr, void *p, void *data)
785 {
786         struct drbd_conf *mdev = (struct drbd_conf *)p;
787         int err;
788
789         atomic_set(&mdev->packet_seq, 0);
790         mdev->peer_seq = 0;
791
792         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
793                 &mdev->tconn->cstate_mutex :
794                 &mdev->own_state_mutex;
795
796         err = drbd_send_sync_param(mdev);
797         if (!err)
798                 err = drbd_send_sizes(mdev, 0, 0);
799         if (!err)
800                 err = drbd_send_uuids(mdev);
801         if (!err)
802                 err = drbd_send_state(mdev);
803         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
804         clear_bit(RESIZE_PENDING, &mdev->flags);
805         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
806         return err;
807 }
808
809 /*
810  * return values:
811  *   1 yes, we have a valid connection
812  *   0 oops, did not work out, please try again
813  *  -1 peer talks different language,
814  *     no point in trying again, please go standalone.
815  *  -2 We do not have a network config...
816  */
817 static int drbd_connect(struct drbd_tconn *tconn)
818 {
819         struct socket *sock, *msock;
820         int try, h, ok;
821
822         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
823                 return -2;
824
825         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
826
827         /* Assume that the peer only understands protocol 80 until we know better.  */
828         tconn->agreed_pro_version = 80;
829
830         do {
831                 struct socket *s;
832
833                 for (try = 0;;) {
834                         /* 3 tries, this should take less than a second! */
835                         s = drbd_try_connect(tconn);
836                         if (s || ++try >= 3)
837                                 break;
838                         /* give the other side time to call bind() & listen() */
839                         schedule_timeout_interruptible(HZ / 10);
840                 }
841
842                 if (s) {
843                         if (!tconn->data.socket) {
844                                 tconn->data.socket = s;
845                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
846                         } else if (!tconn->meta.socket) {
847                                 tconn->meta.socket = s;
848                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
849                         } else {
850                                 conn_err(tconn, "Logic error in drbd_connect()\n");
851                                 goto out_release_sockets;
852                         }
853                 }
854
855                 if (tconn->data.socket && tconn->meta.socket) {
856                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
857                         ok = drbd_socket_okay(&tconn->data.socket);
858                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
859                         if (ok)
860                                 break;
861                 }
862
863 retry:
864                 s = drbd_wait_for_connect(tconn);
865                 if (s) {
866                         try = receive_first_packet(tconn, s);
867                         drbd_socket_okay(&tconn->data.socket);
868                         drbd_socket_okay(&tconn->meta.socket);
869                         switch (try) {
870                         case P_INITIAL_DATA:
871                                 if (tconn->data.socket) {
872                                         conn_warn(tconn, "initial packet S crossed\n");
873                                         sock_release(tconn->data.socket);
874                                 }
875                                 tconn->data.socket = s;
876                                 break;
877                         case P_INITIAL_META:
878                                 if (tconn->meta.socket) {
879                                         conn_warn(tconn, "initial packet M crossed\n");
880                                         sock_release(tconn->meta.socket);
881                                 }
882                                 tconn->meta.socket = s;
883                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
884                                 break;
885                         default:
886                                 conn_warn(tconn, "Error receiving initial packet\n");
887                                 sock_release(s);
888                                 if (random32() & 1)
889                                         goto retry;
890                         }
891                 }
892
893                 if (tconn->cstate <= C_DISCONNECTING)
894                         goto out_release_sockets;
895                 if (signal_pending(current)) {
896                         flush_signals(current);
897                         smp_rmb();
898                         if (get_t_state(&tconn->receiver) == EXITING)
899                                 goto out_release_sockets;
900                 }
901
902                 if (tconn->data.socket && &tconn->meta.socket) {
903                         ok = drbd_socket_okay(&tconn->data.socket);
904                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
905                         if (ok)
906                                 break;
907                 }
908         } while (1);
909
910         sock  = tconn->data.socket;
911         msock = tconn->meta.socket;
912
913         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
914         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
915
916         sock->sk->sk_allocation = GFP_NOIO;
917         msock->sk->sk_allocation = GFP_NOIO;
918
919         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
920         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
921
922         /* NOT YET ...
923          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
924          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925          * first set it to the P_CONNECTION_FEATURES timeout,
926          * which we set to 4x the configured ping_timeout. */
927         sock->sk->sk_sndtimeo =
928         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
929
930         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
931         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
932
933         /* we don't want delays.
934          * we use TCP_CORK where appropriate, though */
935         drbd_tcp_nodelay(sock);
936         drbd_tcp_nodelay(msock);
937
938         tconn->last_received = jiffies;
939
940         h = drbd_do_features(tconn);
941         if (h <= 0)
942                 return h;
943
944         if (tconn->cram_hmac_tfm) {
945                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
946                 switch (drbd_do_auth(tconn)) {
947                 case -1:
948                         conn_err(tconn, "Authentication of peer failed\n");
949                         return -1;
950                 case 0:
951                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
952                         return 0;
953                 }
954         }
955
956         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
957                 return 0;
958
959         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
960         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
961
962         drbd_thread_start(&tconn->asender);
963
964         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
965                 return -1;
966
967         down_read(&drbd_cfg_rwsem);
968         h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
969         up_read(&drbd_cfg_rwsem);
970         return h;
971
972 out_release_sockets:
973         if (tconn->data.socket) {
974                 sock_release(tconn->data.socket);
975                 tconn->data.socket = NULL;
976         }
977         if (tconn->meta.socket) {
978                 sock_release(tconn->meta.socket);
979                 tconn->meta.socket = NULL;
980         }
981         return -1;
982 }
983
984 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
985 {
986         unsigned int header_size = drbd_header_size(tconn);
987
988         if (header_size == sizeof(struct p_header100) &&
989             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
990                 struct p_header100 *h = header;
991                 if (h->pad != 0) {
992                         conn_err(tconn, "Header padding is not zero\n");
993                         return -EINVAL;
994                 }
995                 pi->vnr = be16_to_cpu(h->volume);
996                 pi->cmd = be16_to_cpu(h->command);
997                 pi->size = be32_to_cpu(h->length);
998         } else if (header_size == sizeof(struct p_header95) &&
999                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1000                 struct p_header95 *h = header;
1001                 pi->cmd = be16_to_cpu(h->command);
1002                 pi->size = be32_to_cpu(h->length);
1003                 pi->vnr = 0;
1004         } else if (header_size == sizeof(struct p_header80) &&
1005                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1006                 struct p_header80 *h = header;
1007                 pi->cmd = be16_to_cpu(h->command);
1008                 pi->size = be16_to_cpu(h->length);
1009                 pi->vnr = 0;
1010         } else {
1011                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1012                          be32_to_cpu(*(__be32 *)header),
1013                          tconn->agreed_pro_version);
1014                 return -EINVAL;
1015         }
1016         pi->data = header + header_size;
1017         return 0;
1018 }
1019
1020 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1021 {
1022         void *buffer = tconn->data.rbuf;
1023         int err;
1024
1025         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1026         if (err)
1027                 return err;
1028
1029         err = decode_header(tconn, buffer, pi);
1030         tconn->last_received = jiffies;
1031
1032         return err;
1033 }
1034
1035 static void drbd_flush(struct drbd_conf *mdev)
1036 {
1037         int rv;
1038
1039         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1040                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1041                                         NULL);
1042                 if (rv) {
1043                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1044                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1045                          * don't try again for ANY return value != 0
1046                          * if (rv == -EOPNOTSUPP) */
1047                         drbd_bump_write_ordering(mdev, WO_drain_io);
1048                 }
1049                 put_ldev(mdev);
1050         }
1051 }
1052
1053 /**
1054  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1055  * @mdev:       DRBD device.
1056  * @epoch:      Epoch object.
1057  * @ev:         Epoch event.
1058  */
1059 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1060                                                struct drbd_epoch *epoch,
1061                                                enum epoch_event ev)
1062 {
1063         int epoch_size;
1064         struct drbd_epoch *next_epoch;
1065         enum finish_epoch rv = FE_STILL_LIVE;
1066
1067         spin_lock(&mdev->epoch_lock);
1068         do {
1069                 next_epoch = NULL;
1070
1071                 epoch_size = atomic_read(&epoch->epoch_size);
1072
1073                 switch (ev & ~EV_CLEANUP) {
1074                 case EV_PUT:
1075                         atomic_dec(&epoch->active);
1076                         break;
1077                 case EV_GOT_BARRIER_NR:
1078                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1079                         break;
1080                 case EV_BECAME_LAST:
1081                         /* nothing to do*/
1082                         break;
1083                 }
1084
1085                 if (epoch_size != 0 &&
1086                     atomic_read(&epoch->active) == 0 &&
1087                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1088                         if (!(ev & EV_CLEANUP)) {
1089                                 spin_unlock(&mdev->epoch_lock);
1090                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1091                                 spin_lock(&mdev->epoch_lock);
1092                         }
1093                         dec_unacked(mdev);
1094
1095                         if (mdev->current_epoch != epoch) {
1096                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1097                                 list_del(&epoch->list);
1098                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1099                                 mdev->epochs--;
1100                                 kfree(epoch);
1101
1102                                 if (rv == FE_STILL_LIVE)
1103                                         rv = FE_DESTROYED;
1104                         } else {
1105                                 epoch->flags = 0;
1106                                 atomic_set(&epoch->epoch_size, 0);
1107                                 /* atomic_set(&epoch->active, 0); is already zero */
1108                                 if (rv == FE_STILL_LIVE)
1109                                         rv = FE_RECYCLED;
1110                                 wake_up(&mdev->ee_wait);
1111                         }
1112                 }
1113
1114                 if (!next_epoch)
1115                         break;
1116
1117                 epoch = next_epoch;
1118         } while (1);
1119
1120         spin_unlock(&mdev->epoch_lock);
1121
1122         return rv;
1123 }
1124
1125 /**
1126  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1127  * @mdev:       DRBD device.
1128  * @wo:         Write ordering method to try.
1129  */
1130 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1131 {
1132         enum write_ordering_e pwo;
1133         static char *write_ordering_str[] = {
1134                 [WO_none] = "none",
1135                 [WO_drain_io] = "drain",
1136                 [WO_bdev_flush] = "flush",
1137         };
1138
1139         pwo = mdev->write_ordering;
1140         wo = min(pwo, wo);
1141         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1142                 wo = WO_drain_io;
1143         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1144                 wo = WO_none;
1145         mdev->write_ordering = wo;
1146         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1147                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1148 }
1149
1150 /**
1151  * drbd_submit_peer_request()
1152  * @mdev:       DRBD device.
1153  * @peer_req:   peer request
1154  * @rw:         flag field, see bio->bi_rw
1155  *
1156  * May spread the pages to multiple bios,
1157  * depending on bio_add_page restrictions.
1158  *
1159  * Returns 0 if all bios have been submitted,
1160  * -ENOMEM if we could not allocate enough bios,
1161  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1162  *  single page to an empty bio (which should never happen and likely indicates
1163  *  that the lower level IO stack is in some way broken). This has been observed
1164  *  on certain Xen deployments.
1165  */
1166 /* TODO allocate from our own bio_set. */
1167 int drbd_submit_peer_request(struct drbd_conf *mdev,
1168                              struct drbd_peer_request *peer_req,
1169                              const unsigned rw, const int fault_type)
1170 {
1171         struct bio *bios = NULL;
1172         struct bio *bio;
1173         struct page *page = peer_req->pages;
1174         sector_t sector = peer_req->i.sector;
1175         unsigned ds = peer_req->i.size;
1176         unsigned n_bios = 0;
1177         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1178         int err = -ENOMEM;
1179
1180         /* In most cases, we will only need one bio.  But in case the lower
1181          * level restrictions happen to be different at this offset on this
1182          * side than those of the sending peer, we may need to submit the
1183          * request in more than one bio.
1184          *
1185          * Plain bio_alloc is good enough here, this is no DRBD internally
1186          * generated bio, but a bio allocated on behalf of the peer.
1187          */
1188 next_bio:
1189         bio = bio_alloc(GFP_NOIO, nr_pages);
1190         if (!bio) {
1191                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1192                 goto fail;
1193         }
1194         /* > peer_req->i.sector, unless this is the first bio */
1195         bio->bi_sector = sector;
1196         bio->bi_bdev = mdev->ldev->backing_bdev;
1197         bio->bi_rw = rw;
1198         bio->bi_private = peer_req;
1199         bio->bi_end_io = drbd_peer_request_endio;
1200
1201         bio->bi_next = bios;
1202         bios = bio;
1203         ++n_bios;
1204
1205         page_chain_for_each(page) {
1206                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1207                 if (!bio_add_page(bio, page, len, 0)) {
1208                         /* A single page must always be possible!
1209                          * But in case it fails anyways,
1210                          * we deal with it, and complain (below). */
1211                         if (bio->bi_vcnt == 0) {
1212                                 dev_err(DEV,
1213                                         "bio_add_page failed for len=%u, "
1214                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1215                                         len, (unsigned long long)bio->bi_sector);
1216                                 err = -ENOSPC;
1217                                 goto fail;
1218                         }
1219                         goto next_bio;
1220                 }
1221                 ds -= len;
1222                 sector += len >> 9;
1223                 --nr_pages;
1224         }
1225         D_ASSERT(page == NULL);
1226         D_ASSERT(ds == 0);
1227
1228         atomic_set(&peer_req->pending_bios, n_bios);
1229         do {
1230                 bio = bios;
1231                 bios = bios->bi_next;
1232                 bio->bi_next = NULL;
1233
1234                 drbd_generic_make_request(mdev, fault_type, bio);
1235         } while (bios);
1236         return 0;
1237
1238 fail:
1239         while (bios) {
1240                 bio = bios;
1241                 bios = bios->bi_next;
1242                 bio_put(bio);
1243         }
1244         return err;
1245 }
1246
1247 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1248                                              struct drbd_peer_request *peer_req)
1249 {
1250         struct drbd_interval *i = &peer_req->i;
1251
1252         drbd_remove_interval(&mdev->write_requests, i);
1253         drbd_clear_interval(i);
1254
1255         /* Wake up any processes waiting for this peer request to complete.  */
1256         if (i->waiting)
1257                 wake_up(&mdev->misc_wait);
1258 }
1259
1260 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1261 {
1262         struct drbd_conf *mdev;
1263         int rv;
1264         struct p_barrier *p = pi->data;
1265         struct drbd_epoch *epoch;
1266
1267         mdev = vnr_to_mdev(tconn, pi->vnr);
1268         if (!mdev)
1269                 return -EIO;
1270
1271         inc_unacked(mdev);
1272
1273         mdev->current_epoch->barrier_nr = p->barrier;
1274         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1275
1276         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1277          * the activity log, which means it would not be resynced in case the
1278          * R_PRIMARY crashes now.
1279          * Therefore we must send the barrier_ack after the barrier request was
1280          * completed. */
1281         switch (mdev->write_ordering) {
1282         case WO_none:
1283                 if (rv == FE_RECYCLED)
1284                         return 0;
1285
1286                 /* receiver context, in the writeout path of the other node.
1287                  * avoid potential distributed deadlock */
1288                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1289                 if (epoch)
1290                         break;
1291                 else
1292                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1293                         /* Fall through */
1294
1295         case WO_bdev_flush:
1296         case WO_drain_io:
1297                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1298                 drbd_flush(mdev);
1299
1300                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1301                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1302                         if (epoch)
1303                                 break;
1304                 }
1305
1306                 epoch = mdev->current_epoch;
1307                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1308
1309                 D_ASSERT(atomic_read(&epoch->active) == 0);
1310                 D_ASSERT(epoch->flags == 0);
1311
1312                 return 0;
1313         default:
1314                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1315                 return -EIO;
1316         }
1317
1318         epoch->flags = 0;
1319         atomic_set(&epoch->epoch_size, 0);
1320         atomic_set(&epoch->active, 0);
1321
1322         spin_lock(&mdev->epoch_lock);
1323         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1324                 list_add(&epoch->list, &mdev->current_epoch->list);
1325                 mdev->current_epoch = epoch;
1326                 mdev->epochs++;
1327         } else {
1328                 /* The current_epoch got recycled while we allocated this one... */
1329                 kfree(epoch);
1330         }
1331         spin_unlock(&mdev->epoch_lock);
1332
1333         return 0;
1334 }
1335
1336 /* used from receive_RSDataReply (recv_resync_read)
1337  * and from receive_Data */
1338 static struct drbd_peer_request *
1339 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1340               int data_size) __must_hold(local)
1341 {
1342         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1343         struct drbd_peer_request *peer_req;
1344         struct page *page;
1345         int dgs, ds, err;
1346         void *dig_in = mdev->tconn->int_dig_in;
1347         void *dig_vv = mdev->tconn->int_dig_vv;
1348         unsigned long *data;
1349
1350         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1351                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1352
1353         if (dgs) {
1354                 /*
1355                  * FIXME: Receive the incoming digest into the receive buffer
1356                  *        here, together with its struct p_data?
1357                  */
1358                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1359                 if (err)
1360                         return NULL;
1361         }
1362
1363         data_size -= dgs;
1364
1365         if (!expect(data_size != 0))
1366                 return NULL;
1367         if (!expect(IS_ALIGNED(data_size, 512)))
1368                 return NULL;
1369         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1370                 return NULL;
1371
1372         /* even though we trust out peer,
1373          * we sometimes have to double check. */
1374         if (sector + (data_size>>9) > capacity) {
1375                 dev_err(DEV, "request from peer beyond end of local disk: "
1376                         "capacity: %llus < sector: %llus + size: %u\n",
1377                         (unsigned long long)capacity,
1378                         (unsigned long long)sector, data_size);
1379                 return NULL;
1380         }
1381
1382         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1383          * "criss-cross" setup, that might cause write-out on some other DRBD,
1384          * which in turn might block on the other node at this very place.  */
1385         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1386         if (!peer_req)
1387                 return NULL;
1388
1389         ds = data_size;
1390         page = peer_req->pages;
1391         page_chain_for_each(page) {
1392                 unsigned len = min_t(int, ds, PAGE_SIZE);
1393                 data = kmap(page);
1394                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1395                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1396                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1397                         data[0] = data[0] ^ (unsigned long)-1;
1398                 }
1399                 kunmap(page);
1400                 if (err) {
1401                         drbd_free_peer_req(mdev, peer_req);
1402                         return NULL;
1403                 }
1404                 ds -= len;
1405         }
1406
1407         if (dgs) {
1408                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1409                 if (memcmp(dig_in, dig_vv, dgs)) {
1410                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1411                                 (unsigned long long)sector, data_size);
1412                         drbd_free_peer_req(mdev, peer_req);
1413                         return NULL;
1414                 }
1415         }
1416         mdev->recv_cnt += data_size>>9;
1417         return peer_req;
1418 }
1419
1420 /* drbd_drain_block() just takes a data block
1421  * out of the socket input buffer, and discards it.
1422  */
1423 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1424 {
1425         struct page *page;
1426         int err = 0;
1427         void *data;
1428
1429         if (!data_size)
1430                 return 0;
1431
1432         page = drbd_alloc_pages(mdev, 1, 1);
1433
1434         data = kmap(page);
1435         while (data_size) {
1436                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1437
1438                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1439                 if (err)
1440                         break;
1441                 data_size -= len;
1442         }
1443         kunmap(page);
1444         drbd_free_pages(mdev, page, 0);
1445         return err;
1446 }
1447
1448 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1449                            sector_t sector, int data_size)
1450 {
1451         struct bio_vec *bvec;
1452         struct bio *bio;
1453         int dgs, err, i, expect;
1454         void *dig_in = mdev->tconn->int_dig_in;
1455         void *dig_vv = mdev->tconn->int_dig_vv;
1456
1457         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1458                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1459
1460         if (dgs) {
1461                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1462                 if (err)
1463                         return err;
1464         }
1465
1466         data_size -= dgs;
1467
1468         /* optimistically update recv_cnt.  if receiving fails below,
1469          * we disconnect anyways, and counters will be reset. */
1470         mdev->recv_cnt += data_size>>9;
1471
1472         bio = req->master_bio;
1473         D_ASSERT(sector == bio->bi_sector);
1474
1475         bio_for_each_segment(bvec, bio, i) {
1476                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1477                 expect = min_t(int, data_size, bvec->bv_len);
1478                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1479                 kunmap(bvec->bv_page);
1480                 if (err)
1481                         return err;
1482                 data_size -= expect;
1483         }
1484
1485         if (dgs) {
1486                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1487                 if (memcmp(dig_in, dig_vv, dgs)) {
1488                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1489                         return -EINVAL;
1490                 }
1491         }
1492
1493         D_ASSERT(data_size == 0);
1494         return 0;
1495 }
1496
1497 /*
1498  * e_end_resync_block() is called in asender context via
1499  * drbd_finish_peer_reqs().
1500  */
1501 static int e_end_resync_block(struct drbd_work *w, int unused)
1502 {
1503         struct drbd_peer_request *peer_req =
1504                 container_of(w, struct drbd_peer_request, w);
1505         struct drbd_conf *mdev = w->mdev;
1506         sector_t sector = peer_req->i.sector;
1507         int err;
1508
1509         D_ASSERT(drbd_interval_empty(&peer_req->i));
1510
1511         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1512                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1513                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1514         } else {
1515                 /* Record failure to sync */
1516                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1517
1518                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1519         }
1520         dec_unacked(mdev);
1521
1522         return err;
1523 }
1524
1525 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1526 {
1527         struct drbd_peer_request *peer_req;
1528
1529         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1530         if (!peer_req)
1531                 goto fail;
1532
1533         dec_rs_pending(mdev);
1534
1535         inc_unacked(mdev);
1536         /* corresponding dec_unacked() in e_end_resync_block()
1537          * respective _drbd_clear_done_ee */
1538
1539         peer_req->w.cb = e_end_resync_block;
1540
1541         spin_lock_irq(&mdev->tconn->req_lock);
1542         list_add(&peer_req->w.list, &mdev->sync_ee);
1543         spin_unlock_irq(&mdev->tconn->req_lock);
1544
1545         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1546         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1547                 return 0;
1548
1549         /* don't care for the reason here */
1550         dev_err(DEV, "submit failed, triggering re-connect\n");
1551         spin_lock_irq(&mdev->tconn->req_lock);
1552         list_del(&peer_req->w.list);
1553         spin_unlock_irq(&mdev->tconn->req_lock);
1554
1555         drbd_free_peer_req(mdev, peer_req);
1556 fail:
1557         put_ldev(mdev);
1558         return -EIO;
1559 }
1560
1561 static struct drbd_request *
1562 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1563              sector_t sector, bool missing_ok, const char *func)
1564 {
1565         struct drbd_request *req;
1566
1567         /* Request object according to our peer */
1568         req = (struct drbd_request *)(unsigned long)id;
1569         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1570                 return req;
1571         if (!missing_ok) {
1572                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1573                         (unsigned long)id, (unsigned long long)sector);
1574         }
1575         return NULL;
1576 }
1577
1578 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1579 {
1580         struct drbd_conf *mdev;
1581         struct drbd_request *req;
1582         sector_t sector;
1583         int err;
1584         struct p_data *p = pi->data;
1585
1586         mdev = vnr_to_mdev(tconn, pi->vnr);
1587         if (!mdev)
1588                 return -EIO;
1589
1590         sector = be64_to_cpu(p->sector);
1591
1592         spin_lock_irq(&mdev->tconn->req_lock);
1593         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1594         spin_unlock_irq(&mdev->tconn->req_lock);
1595         if (unlikely(!req))
1596                 return -EIO;
1597
1598         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1599          * special casing it there for the various failure cases.
1600          * still no race with drbd_fail_pending_reads */
1601         err = recv_dless_read(mdev, req, sector, pi->size);
1602         if (!err)
1603                 req_mod(req, DATA_RECEIVED);
1604         /* else: nothing. handled from drbd_disconnect...
1605          * I don't think we may complete this just yet
1606          * in case we are "on-disconnect: freeze" */
1607
1608         return err;
1609 }
1610
1611 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1612 {
1613         struct drbd_conf *mdev;
1614         sector_t sector;
1615         int err;
1616         struct p_data *p = pi->data;
1617
1618         mdev = vnr_to_mdev(tconn, pi->vnr);
1619         if (!mdev)
1620                 return -EIO;
1621
1622         sector = be64_to_cpu(p->sector);
1623         D_ASSERT(p->block_id == ID_SYNCER);
1624
1625         if (get_ldev(mdev)) {
1626                 /* data is submitted to disk within recv_resync_read.
1627                  * corresponding put_ldev done below on error,
1628                  * or in drbd_peer_request_endio. */
1629                 err = recv_resync_read(mdev, sector, pi->size);
1630         } else {
1631                 if (__ratelimit(&drbd_ratelimit_state))
1632                         dev_err(DEV, "Can not write resync data to local disk.\n");
1633
1634                 err = drbd_drain_block(mdev, pi->size);
1635
1636                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1637         }
1638
1639         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1640
1641         return err;
1642 }
1643
1644 static int w_restart_write(struct drbd_work *w, int cancel)
1645 {
1646         struct drbd_request *req = container_of(w, struct drbd_request, w);
1647         struct drbd_conf *mdev = w->mdev;
1648         struct bio *bio;
1649         unsigned long start_time;
1650         unsigned long flags;
1651
1652         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1653         if (!expect(req->rq_state & RQ_POSTPONED)) {
1654                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1655                 return -EIO;
1656         }
1657         bio = req->master_bio;
1658         start_time = req->start_time;
1659         /* Postponed requests will not have their master_bio completed!  */
1660         __req_mod(req, DISCARD_WRITE, NULL);
1661         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1662
1663         while (__drbd_make_request(mdev, bio, start_time))
1664                 /* retry */ ;
1665         return 0;
1666 }
1667
1668 static void restart_conflicting_writes(struct drbd_conf *mdev,
1669                                        sector_t sector, int size)
1670 {
1671         struct drbd_interval *i;
1672         struct drbd_request *req;
1673
1674         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1675                 if (!i->local)
1676                         continue;
1677                 req = container_of(i, struct drbd_request, i);
1678                 if (req->rq_state & RQ_LOCAL_PENDING ||
1679                     !(req->rq_state & RQ_POSTPONED))
1680                         continue;
1681                 if (expect(list_empty(&req->w.list))) {
1682                         req->w.mdev = mdev;
1683                         req->w.cb = w_restart_write;
1684                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1685                 }
1686         }
1687 }
1688
1689 /*
1690  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1691  */
1692 static int e_end_block(struct drbd_work *w, int cancel)
1693 {
1694         struct drbd_peer_request *peer_req =
1695                 container_of(w, struct drbd_peer_request, w);
1696         struct drbd_conf *mdev = w->mdev;
1697         sector_t sector = peer_req->i.sector;
1698         int err = 0, pcmd;
1699
1700         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1701                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1702                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1703                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1704                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1705                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1706                         err = drbd_send_ack(mdev, pcmd, peer_req);
1707                         if (pcmd == P_RS_WRITE_ACK)
1708                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1709                 } else {
1710                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1711                         /* we expect it to be marked out of sync anyways...
1712                          * maybe assert this?  */
1713                 }
1714                 dec_unacked(mdev);
1715         }
1716         /* we delete from the conflict detection hash _after_ we sent out the
1717          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1718         if (mdev->tconn->net_conf->two_primaries) {
1719                 spin_lock_irq(&mdev->tconn->req_lock);
1720                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1721                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1722                 if (peer_req->flags & EE_RESTART_REQUESTS)
1723                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1724                 spin_unlock_irq(&mdev->tconn->req_lock);
1725         } else
1726                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1727
1728         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1729
1730         return err;
1731 }
1732
1733 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1734 {
1735         struct drbd_conf *mdev = w->mdev;
1736         struct drbd_peer_request *peer_req =
1737                 container_of(w, struct drbd_peer_request, w);
1738         int err;
1739
1740         err = drbd_send_ack(mdev, ack, peer_req);
1741         dec_unacked(mdev);
1742
1743         return err;
1744 }
1745
1746 static int e_send_discard_write(struct drbd_work *w, int unused)
1747 {
1748         return e_send_ack(w, P_DISCARD_WRITE);
1749 }
1750
1751 static int e_send_retry_write(struct drbd_work *w, int unused)
1752 {
1753         struct drbd_tconn *tconn = w->mdev->tconn;
1754
1755         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1756                              P_RETRY_WRITE : P_DISCARD_WRITE);
1757 }
1758
1759 static bool seq_greater(u32 a, u32 b)
1760 {
1761         /*
1762          * We assume 32-bit wrap-around here.
1763          * For 24-bit wrap-around, we would have to shift:
1764          *  a <<= 8; b <<= 8;
1765          */
1766         return (s32)a - (s32)b > 0;
1767 }
1768
1769 static u32 seq_max(u32 a, u32 b)
1770 {
1771         return seq_greater(a, b) ? a : b;
1772 }
1773
1774 static bool need_peer_seq(struct drbd_conf *mdev)
1775 {
1776         struct drbd_tconn *tconn = mdev->tconn;
1777
1778         /*
1779          * We only need to keep track of the last packet_seq number of our peer
1780          * if we are in dual-primary mode and we have the discard flag set; see
1781          * handle_write_conflicts().
1782          */
1783         return tconn->net_conf->two_primaries &&
1784                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1785 }
1786
1787 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1788 {
1789         unsigned int newest_peer_seq;
1790
1791         if (need_peer_seq(mdev)) {
1792                 spin_lock(&mdev->peer_seq_lock);
1793                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1794                 mdev->peer_seq = newest_peer_seq;
1795                 spin_unlock(&mdev->peer_seq_lock);
1796                 /* wake up only if we actually changed mdev->peer_seq */
1797                 if (peer_seq == newest_peer_seq)
1798                         wake_up(&mdev->seq_wait);
1799         }
1800 }
1801
1802 /* Called from receive_Data.
1803  * Synchronize packets on sock with packets on msock.
1804  *
1805  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1806  * packet traveling on msock, they are still processed in the order they have
1807  * been sent.
1808  *
1809  * Note: we don't care for Ack packets overtaking P_DATA packets.
1810  *
1811  * In case packet_seq is larger than mdev->peer_seq number, there are
1812  * outstanding packets on the msock. We wait for them to arrive.
1813  * In case we are the logically next packet, we update mdev->peer_seq
1814  * ourselves. Correctly handles 32bit wrap around.
1815  *
1816  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1817  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1818  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1819  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1820  *
1821  * returns 0 if we may process the packet,
1822  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1823 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1824 {
1825         DEFINE_WAIT(wait);
1826         long timeout;
1827         int ret;
1828
1829         if (!need_peer_seq(mdev))
1830                 return 0;
1831
1832         spin_lock(&mdev->peer_seq_lock);
1833         for (;;) {
1834                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1835                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1836                         ret = 0;
1837                         break;
1838                 }
1839                 if (signal_pending(current)) {
1840                         ret = -ERESTARTSYS;
1841                         break;
1842                 }
1843                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1844                 spin_unlock(&mdev->peer_seq_lock);
1845                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1846                 timeout = schedule_timeout(timeout);
1847                 spin_lock(&mdev->peer_seq_lock);
1848                 if (!timeout) {
1849                         ret = -ETIMEDOUT;
1850                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1851                         break;
1852                 }
1853         }
1854         spin_unlock(&mdev->peer_seq_lock);
1855         finish_wait(&mdev->seq_wait, &wait);
1856         return ret;
1857 }
1858
1859 /* see also bio_flags_to_wire()
1860  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1861  * flags and back. We may replicate to other kernel versions. */
1862 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1863 {
1864         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1865                 (dpf & DP_FUA ? REQ_FUA : 0) |
1866                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1867                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1868 }
1869
1870 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1871                                     unsigned int size)
1872 {
1873         struct drbd_interval *i;
1874
1875     repeat:
1876         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1877                 struct drbd_request *req;
1878                 struct bio_and_error m;
1879
1880                 if (!i->local)
1881                         continue;
1882                 req = container_of(i, struct drbd_request, i);
1883                 if (!(req->rq_state & RQ_POSTPONED))
1884                         continue;
1885                 req->rq_state &= ~RQ_POSTPONED;
1886                 __req_mod(req, NEG_ACKED, &m);
1887                 spin_unlock_irq(&mdev->tconn->req_lock);
1888                 if (m.bio)
1889                         complete_master_bio(mdev, &m);
1890                 spin_lock_irq(&mdev->tconn->req_lock);
1891                 goto repeat;
1892         }
1893 }
1894
1895 static int handle_write_conflicts(struct drbd_conf *mdev,
1896                                   struct drbd_peer_request *peer_req)
1897 {
1898         struct drbd_tconn *tconn = mdev->tconn;
1899         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1900         sector_t sector = peer_req->i.sector;
1901         const unsigned int size = peer_req->i.size;
1902         struct drbd_interval *i;
1903         bool equal;
1904         int err;
1905
1906         /*
1907          * Inserting the peer request into the write_requests tree will prevent
1908          * new conflicting local requests from being added.
1909          */
1910         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1911
1912     repeat:
1913         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1914                 if (i == &peer_req->i)
1915                         continue;
1916
1917                 if (!i->local) {
1918                         /*
1919                          * Our peer has sent a conflicting remote request; this
1920                          * should not happen in a two-node setup.  Wait for the
1921                          * earlier peer request to complete.
1922                          */
1923                         err = drbd_wait_misc(mdev, i);
1924                         if (err)
1925                                 goto out;
1926                         goto repeat;
1927                 }
1928
1929                 equal = i->sector == sector && i->size == size;
1930                 if (resolve_conflicts) {
1931                         /*
1932                          * If the peer request is fully contained within the
1933                          * overlapping request, it can be discarded; otherwise,
1934                          * it will be retried once all overlapping requests
1935                          * have completed.
1936                          */
1937                         bool discard = i->sector <= sector && i->sector +
1938                                        (i->size >> 9) >= sector + (size >> 9);
1939
1940                         if (!equal)
1941                                 dev_alert(DEV, "Concurrent writes detected: "
1942                                                "local=%llus +%u, remote=%llus +%u, "
1943                                                "assuming %s came first\n",
1944                                           (unsigned long long)i->sector, i->size,
1945                                           (unsigned long long)sector, size,
1946                                           discard ? "local" : "remote");
1947
1948                         inc_unacked(mdev);
1949                         peer_req->w.cb = discard ? e_send_discard_write :
1950                                                    e_send_retry_write;
1951                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1952                         wake_asender(mdev->tconn);
1953
1954                         err = -ENOENT;
1955                         goto out;
1956                 } else {
1957                         struct drbd_request *req =
1958                                 container_of(i, struct drbd_request, i);
1959
1960                         if (!equal)
1961                                 dev_alert(DEV, "Concurrent writes detected: "
1962                                                "local=%llus +%u, remote=%llus +%u\n",
1963                                           (unsigned long long)i->sector, i->size,
1964                                           (unsigned long long)sector, size);
1965
1966                         if (req->rq_state & RQ_LOCAL_PENDING ||
1967                             !(req->rq_state & RQ_POSTPONED)) {
1968                                 /*
1969                                  * Wait for the node with the discard flag to
1970                                  * decide if this request will be discarded or
1971                                  * retried.  Requests that are discarded will
1972                                  * disappear from the write_requests tree.
1973                                  *
1974                                  * In addition, wait for the conflicting
1975                                  * request to finish locally before submitting
1976                                  * the conflicting peer request.
1977                                  */
1978                                 err = drbd_wait_misc(mdev, &req->i);
1979                                 if (err) {
1980                                         _conn_request_state(mdev->tconn,
1981                                                             NS(conn, C_TIMEOUT),
1982                                                             CS_HARD);
1983                                         fail_postponed_requests(mdev, sector, size);
1984                                         goto out;
1985                                 }
1986                                 goto repeat;
1987                         }
1988                         /*
1989                          * Remember to restart the conflicting requests after
1990                          * the new peer request has completed.
1991                          */
1992                         peer_req->flags |= EE_RESTART_REQUESTS;
1993                 }
1994         }
1995         err = 0;
1996
1997     out:
1998         if (err)
1999                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2000         return err;
2001 }
2002
2003 /* mirrored write */
2004 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2005 {
2006         struct drbd_conf *mdev;
2007         sector_t sector;
2008         struct drbd_peer_request *peer_req;
2009         struct p_data *p = pi->data;
2010         u32 peer_seq = be32_to_cpu(p->seq_num);
2011         int rw = WRITE;
2012         u32 dp_flags;
2013         int err;
2014
2015         mdev = vnr_to_mdev(tconn, pi->vnr);
2016         if (!mdev)
2017                 return -EIO;
2018
2019         if (!get_ldev(mdev)) {
2020                 int err2;
2021
2022                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2023                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2024                 atomic_inc(&mdev->current_epoch->epoch_size);
2025                 err2 = drbd_drain_block(mdev, pi->size);
2026                 if (!err)
2027                         err = err2;
2028                 return err;
2029         }
2030
2031         /*
2032          * Corresponding put_ldev done either below (on various errors), or in
2033          * drbd_peer_request_endio, if we successfully submit the data at the
2034          * end of this function.
2035          */
2036
2037         sector = be64_to_cpu(p->sector);
2038         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2039         if (!peer_req) {
2040                 put_ldev(mdev);
2041                 return -EIO;
2042         }
2043
2044         peer_req->w.cb = e_end_block;
2045
2046         dp_flags = be32_to_cpu(p->dp_flags);
2047         rw |= wire_flags_to_bio(mdev, dp_flags);
2048
2049         if (dp_flags & DP_MAY_SET_IN_SYNC)
2050                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2051
2052         spin_lock(&mdev->epoch_lock);
2053         peer_req->epoch = mdev->current_epoch;
2054         atomic_inc(&peer_req->epoch->epoch_size);
2055         atomic_inc(&peer_req->epoch->active);
2056         spin_unlock(&mdev->epoch_lock);
2057
2058         if (mdev->tconn->net_conf->two_primaries) {
2059                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2060                 if (err)
2061                         goto out_interrupted;
2062                 spin_lock_irq(&mdev->tconn->req_lock);
2063                 err = handle_write_conflicts(mdev, peer_req);
2064                 if (err) {
2065                         spin_unlock_irq(&mdev->tconn->req_lock);
2066                         if (err == -ENOENT) {
2067                                 put_ldev(mdev);
2068                                 return 0;
2069                         }
2070                         goto out_interrupted;
2071                 }
2072         } else
2073                 spin_lock_irq(&mdev->tconn->req_lock);
2074         list_add(&peer_req->w.list, &mdev->active_ee);
2075         spin_unlock_irq(&mdev->tconn->req_lock);
2076
2077         switch (mdev->tconn->net_conf->wire_protocol) {
2078         case DRBD_PROT_C:
2079                 inc_unacked(mdev);
2080                 /* corresponding dec_unacked() in e_end_block()
2081                  * respective _drbd_clear_done_ee */
2082                 break;
2083         case DRBD_PROT_B:
2084                 /* I really don't like it that the receiver thread
2085                  * sends on the msock, but anyways */
2086                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2087                 break;
2088         case DRBD_PROT_A:
2089                 /* nothing to do */
2090                 break;
2091         }
2092
2093         if (mdev->state.pdsk < D_INCONSISTENT) {
2094                 /* In case we have the only disk of the cluster, */
2095                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2096                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2097                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2098                 drbd_al_begin_io(mdev, &peer_req->i);
2099         }
2100
2101         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2102         if (!err)
2103                 return 0;
2104
2105         /* don't care for the reason here */
2106         dev_err(DEV, "submit failed, triggering re-connect\n");
2107         spin_lock_irq(&mdev->tconn->req_lock);
2108         list_del(&peer_req->w.list);
2109         drbd_remove_epoch_entry_interval(mdev, peer_req);
2110         spin_unlock_irq(&mdev->tconn->req_lock);
2111         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2112                 drbd_al_complete_io(mdev, &peer_req->i);
2113
2114 out_interrupted:
2115         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2116         put_ldev(mdev);
2117         drbd_free_peer_req(mdev, peer_req);
2118         return err;
2119 }
2120
2121 /* We may throttle resync, if the lower device seems to be busy,
2122  * and current sync rate is above c_min_rate.
2123  *
2124  * To decide whether or not the lower device is busy, we use a scheme similar
2125  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2126  * (more than 64 sectors) of activity we cannot account for with our own resync
2127  * activity, it obviously is "busy".
2128  *
2129  * The current sync rate used here uses only the most recent two step marks,
2130  * to have a short time average so we can react faster.
2131  */
2132 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2133 {
2134         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2135         unsigned long db, dt, dbdt;
2136         struct lc_element *tmp;
2137         int curr_events;
2138         int throttle = 0;
2139
2140         /* feature disabled? */
2141         if (mdev->ldev->dc.c_min_rate == 0)
2142                 return 0;
2143
2144         spin_lock_irq(&mdev->al_lock);
2145         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2146         if (tmp) {
2147                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2148                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2149                         spin_unlock_irq(&mdev->al_lock);
2150                         return 0;
2151                 }
2152                 /* Do not slow down if app IO is already waiting for this extent */
2153         }
2154         spin_unlock_irq(&mdev->al_lock);
2155
2156         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2157                       (int)part_stat_read(&disk->part0, sectors[1]) -
2158                         atomic_read(&mdev->rs_sect_ev);
2159
2160         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2161                 unsigned long rs_left;
2162                 int i;
2163
2164                 mdev->rs_last_events = curr_events;
2165
2166                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2167                  * approx. */
2168                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2169
2170                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2171                         rs_left = mdev->ov_left;
2172                 else
2173                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2174
2175                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2176                 if (!dt)
2177                         dt++;
2178                 db = mdev->rs_mark_left[i] - rs_left;
2179                 dbdt = Bit2KB(db/dt);
2180
2181                 if (dbdt > mdev->ldev->dc.c_min_rate)
2182                         throttle = 1;
2183         }
2184         return throttle;
2185 }
2186
2187
2188 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2189 {
2190         struct drbd_conf *mdev;
2191         sector_t sector;
2192         sector_t capacity;
2193         struct drbd_peer_request *peer_req;
2194         struct digest_info *di = NULL;
2195         int size, verb;
2196         unsigned int fault_type;
2197         struct p_block_req *p = pi->data;
2198
2199         mdev = vnr_to_mdev(tconn, pi->vnr);
2200         if (!mdev)
2201                 return -EIO;
2202         capacity = drbd_get_capacity(mdev->this_bdev);
2203
2204         sector = be64_to_cpu(p->sector);
2205         size   = be32_to_cpu(p->blksize);
2206
2207         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2208                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2209                                 (unsigned long long)sector, size);
2210                 return -EINVAL;
2211         }
2212         if (sector + (size>>9) > capacity) {
2213                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2214                                 (unsigned long long)sector, size);
2215                 return -EINVAL;
2216         }
2217
2218         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2219                 verb = 1;
2220                 switch (pi->cmd) {
2221                 case P_DATA_REQUEST:
2222                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2223                         break;
2224                 case P_RS_DATA_REQUEST:
2225                 case P_CSUM_RS_REQUEST:
2226                 case P_OV_REQUEST:
2227                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2228                         break;
2229                 case P_OV_REPLY:
2230                         verb = 0;
2231                         dec_rs_pending(mdev);
2232                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2233                         break;
2234                 default:
2235                         BUG();
2236                 }
2237                 if (verb && __ratelimit(&drbd_ratelimit_state))
2238                         dev_err(DEV, "Can not satisfy peer's read request, "
2239                             "no local data.\n");
2240
2241                 /* drain possibly payload */
2242                 return drbd_drain_block(mdev, pi->size);
2243         }
2244
2245         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2246          * "criss-cross" setup, that might cause write-out on some other DRBD,
2247          * which in turn might block on the other node at this very place.  */
2248         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2249         if (!peer_req) {
2250                 put_ldev(mdev);
2251                 return -ENOMEM;
2252         }
2253
2254         switch (pi->cmd) {
2255         case P_DATA_REQUEST:
2256                 peer_req->w.cb = w_e_end_data_req;
2257                 fault_type = DRBD_FAULT_DT_RD;
2258                 /* application IO, don't drbd_rs_begin_io */
2259                 goto submit;
2260
2261         case P_RS_DATA_REQUEST:
2262                 peer_req->w.cb = w_e_end_rsdata_req;
2263                 fault_type = DRBD_FAULT_RS_RD;
2264                 /* used in the sector offset progress display */
2265                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2266                 break;
2267
2268         case P_OV_REPLY:
2269         case P_CSUM_RS_REQUEST:
2270                 fault_type = DRBD_FAULT_RS_RD;
2271                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2272                 if (!di)
2273                         goto out_free_e;
2274
2275                 di->digest_size = pi->size;
2276                 di->digest = (((char *)di)+sizeof(struct digest_info));
2277
2278                 peer_req->digest = di;
2279                 peer_req->flags |= EE_HAS_DIGEST;
2280
2281                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2282                         goto out_free_e;
2283
2284                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2285                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2286                         peer_req->w.cb = w_e_end_csum_rs_req;
2287                         /* used in the sector offset progress display */
2288                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2289                 } else if (pi->cmd == P_OV_REPLY) {
2290                         /* track progress, we may need to throttle */
2291                         atomic_add(size >> 9, &mdev->rs_sect_in);
2292                         peer_req->w.cb = w_e_end_ov_reply;
2293                         dec_rs_pending(mdev);
2294                         /* drbd_rs_begin_io done when we sent this request,
2295                          * but accounting still needs to be done. */
2296                         goto submit_for_resync;
2297                 }
2298                 break;
2299
2300         case P_OV_REQUEST:
2301                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2302                     mdev->tconn->agreed_pro_version >= 90) {
2303                         unsigned long now = jiffies;
2304                         int i;
2305                         mdev->ov_start_sector = sector;
2306                         mdev->ov_position = sector;
2307                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2308                         mdev->rs_total = mdev->ov_left;
2309                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2310                                 mdev->rs_mark_left[i] = mdev->ov_left;
2311                                 mdev->rs_mark_time[i] = now;
2312                         }
2313                         dev_info(DEV, "Online Verify start sector: %llu\n",
2314                                         (unsigned long long)sector);
2315                 }
2316                 peer_req->w.cb = w_e_end_ov_req;
2317                 fault_type = DRBD_FAULT_RS_RD;
2318                 break;
2319
2320         default:
2321                 BUG();
2322         }
2323
2324         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2325          * wrt the receiver, but it is not as straightforward as it may seem.
2326          * Various places in the resync start and stop logic assume resync
2327          * requests are processed in order, requeuing this on the worker thread
2328          * introduces a bunch of new code for synchronization between threads.
2329          *
2330          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2331          * "forever", throttling after drbd_rs_begin_io will lock that extent
2332          * for application writes for the same time.  For now, just throttle
2333          * here, where the rest of the code expects the receiver to sleep for
2334          * a while, anyways.
2335          */
2336
2337         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2338          * this defers syncer requests for some time, before letting at least
2339          * on request through.  The resync controller on the receiving side
2340          * will adapt to the incoming rate accordingly.
2341          *
2342          * We cannot throttle here if remote is Primary/SyncTarget:
2343          * we would also throttle its application reads.
2344          * In that case, throttling is done on the SyncTarget only.
2345          */
2346         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2347                 schedule_timeout_uninterruptible(HZ/10);
2348         if (drbd_rs_begin_io(mdev, sector))
2349                 goto out_free_e;
2350
2351 submit_for_resync:
2352         atomic_add(size >> 9, &mdev->rs_sect_ev);
2353
2354 submit:
2355         inc_unacked(mdev);
2356         spin_lock_irq(&mdev->tconn->req_lock);
2357         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2358         spin_unlock_irq(&mdev->tconn->req_lock);
2359
2360         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2361                 return 0;
2362
2363         /* don't care for the reason here */
2364         dev_err(DEV, "submit failed, triggering re-connect\n");
2365         spin_lock_irq(&mdev->tconn->req_lock);
2366         list_del(&peer_req->w.list);
2367         spin_unlock_irq(&mdev->tconn->req_lock);
2368         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2369
2370 out_free_e:
2371         put_ldev(mdev);
2372         drbd_free_peer_req(mdev, peer_req);
2373         return -EIO;
2374 }
2375
2376 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2377 {
2378         int self, peer, rv = -100;
2379         unsigned long ch_self, ch_peer;
2380
2381         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2382         peer = mdev->p_uuid[UI_BITMAP] & 1;
2383
2384         ch_peer = mdev->p_uuid[UI_SIZE];
2385         ch_self = mdev->comm_bm_set;
2386
2387         switch (mdev->tconn->net_conf->after_sb_0p) {
2388         case ASB_CONSENSUS:
2389         case ASB_DISCARD_SECONDARY:
2390         case ASB_CALL_HELPER:
2391                 dev_err(DEV, "Configuration error.\n");
2392                 break;
2393         case ASB_DISCONNECT:
2394                 break;
2395         case ASB_DISCARD_YOUNGER_PRI:
2396                 if (self == 0 && peer == 1) {
2397                         rv = -1;
2398                         break;
2399                 }
2400                 if (self == 1 && peer == 0) {
2401                         rv =  1;
2402                         break;
2403                 }
2404                 /* Else fall through to one of the other strategies... */
2405         case ASB_DISCARD_OLDER_PRI:
2406                 if (self == 0 && peer == 1) {
2407                         rv = 1;
2408                         break;
2409                 }
2410                 if (self == 1 && peer == 0) {
2411                         rv = -1;
2412                         break;
2413                 }
2414                 /* Else fall through to one of the other strategies... */
2415                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2416                      "Using discard-least-changes instead\n");
2417         case ASB_DISCARD_ZERO_CHG:
2418                 if (ch_peer == 0 && ch_self == 0) {
2419                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2420                                 ? -1 : 1;
2421                         break;
2422                 } else {
2423                         if (ch_peer == 0) { rv =  1; break; }
2424                         if (ch_self == 0) { rv = -1; break; }
2425                 }
2426                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2427                         break;
2428         case ASB_DISCARD_LEAST_CHG:
2429                 if      (ch_self < ch_peer)
2430                         rv = -1;
2431                 else if (ch_self > ch_peer)
2432                         rv =  1;
2433                 else /* ( ch_self == ch_peer ) */
2434                      /* Well, then use something else. */
2435                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2436                                 ? -1 : 1;
2437                 break;
2438         case ASB_DISCARD_LOCAL:
2439                 rv = -1;
2440                 break;
2441         case ASB_DISCARD_REMOTE:
2442                 rv =  1;
2443         }
2444
2445         return rv;
2446 }
2447
2448 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2449 {
2450         int hg, rv = -100;
2451
2452         switch (mdev->tconn->net_conf->after_sb_1p) {
2453         case ASB_DISCARD_YOUNGER_PRI:
2454         case ASB_DISCARD_OLDER_PRI:
2455         case ASB_DISCARD_LEAST_CHG:
2456         case ASB_DISCARD_LOCAL:
2457         case ASB_DISCARD_REMOTE:
2458                 dev_err(DEV, "Configuration error.\n");
2459                 break;
2460         case ASB_DISCONNECT:
2461                 break;
2462         case ASB_CONSENSUS:
2463                 hg = drbd_asb_recover_0p(mdev);
2464                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2465                         rv = hg;
2466                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2467                         rv = hg;
2468                 break;
2469         case ASB_VIOLENTLY:
2470                 rv = drbd_asb_recover_0p(mdev);
2471                 break;
2472         case ASB_DISCARD_SECONDARY:
2473                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2474         case ASB_CALL_HELPER:
2475                 hg = drbd_asb_recover_0p(mdev);
2476                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2477                         enum drbd_state_rv rv2;
2478
2479                         drbd_set_role(mdev, R_SECONDARY, 0);
2480                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2481                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2482                           * we do not need to wait for the after state change work either. */
2483                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2484                         if (rv2 != SS_SUCCESS) {
2485                                 drbd_khelper(mdev, "pri-lost-after-sb");
2486                         } else {
2487                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2488                                 rv = hg;
2489                         }
2490                 } else
2491                         rv = hg;
2492         }
2493
2494         return rv;
2495 }
2496
2497 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2498 {
2499         int hg, rv = -100;
2500
2501         switch (mdev->tconn->net_conf->after_sb_2p) {
2502         case ASB_DISCARD_YOUNGER_PRI:
2503         case ASB_DISCARD_OLDER_PRI:
2504         case ASB_DISCARD_LEAST_CHG:
2505         case ASB_DISCARD_LOCAL:
2506         case ASB_DISCARD_REMOTE:
2507         case ASB_CONSENSUS:
2508         case ASB_DISCARD_SECONDARY:
2509                 dev_err(DEV, "Configuration error.\n");
2510                 break;
2511         case ASB_VIOLENTLY:
2512                 rv = drbd_asb_recover_0p(mdev);
2513                 break;
2514         case ASB_DISCONNECT:
2515                 break;
2516         case ASB_CALL_HELPER:
2517                 hg = drbd_asb_recover_0p(mdev);
2518                 if (hg == -1) {
2519                         enum drbd_state_rv rv2;
2520
2521                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2522                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2523                           * we do not need to wait for the after state change work either. */
2524                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2525                         if (rv2 != SS_SUCCESS) {
2526                                 drbd_khelper(mdev, "pri-lost-after-sb");
2527                         } else {
2528                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2529                                 rv = hg;
2530                         }
2531                 } else
2532                         rv = hg;
2533         }
2534
2535         return rv;
2536 }
2537
2538 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2539                            u64 bits, u64 flags)
2540 {
2541         if (!uuid) {
2542                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2543                 return;
2544         }
2545         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2546              text,
2547              (unsigned long long)uuid[UI_CURRENT],
2548              (unsigned long long)uuid[UI_BITMAP],
2549              (unsigned long long)uuid[UI_HISTORY_START],
2550              (unsigned long long)uuid[UI_HISTORY_END],
2551              (unsigned long long)bits,
2552              (unsigned long long)flags);
2553 }
2554
2555 /*
2556   100   after split brain try auto recover
2557     2   C_SYNC_SOURCE set BitMap
2558     1   C_SYNC_SOURCE use BitMap
2559     0   no Sync
2560    -1   C_SYNC_TARGET use BitMap
2561    -2   C_SYNC_TARGET set BitMap
2562  -100   after split brain, disconnect
2563 -1000   unrelated data
2564 -1091   requires proto 91
2565 -1096   requires proto 96
2566  */
2567 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2568 {
2569         u64 self, peer;
2570         int i, j;
2571
2572         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2573         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2574
2575         *rule_nr = 10;
2576         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2577                 return 0;
2578
2579         *rule_nr = 20;
2580         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2581              peer != UUID_JUST_CREATED)
2582                 return -2;
2583
2584         *rule_nr = 30;
2585         if (self != UUID_JUST_CREATED &&
2586             (peer == UUID_JUST_CREATED || peer == (u64)0))
2587                 return 2;
2588
2589         if (self == peer) {
2590                 int rct, dc; /* roles at crash time */
2591
2592                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2593
2594                         if (mdev->tconn->agreed_pro_version < 91)
2595                                 return -1091;
2596
2597                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2598                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2599                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2600                                 drbd_uuid_set_bm(mdev, 0UL);
2601
2602                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2603                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2604                                 *rule_nr = 34;
2605                         } else {
2606                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2607                                 *rule_nr = 36;
2608                         }
2609
2610                         return 1;
2611                 }
2612
2613                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2614
2615                         if (mdev->tconn->agreed_pro_version < 91)
2616                                 return -1091;
2617
2618                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2619                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2620                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2621
2622                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2623                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2624                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2625
2626                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2627                                 *rule_nr = 35;
2628                         } else {
2629                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2630                                 *rule_nr = 37;
2631                         }
2632
2633                         return -1;
2634                 }
2635
2636                 /* Common power [off|failure] */
2637                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2638                         (mdev->p_uuid[UI_FLAGS] & 2);
2639                 /* lowest bit is set when we were primary,
2640                  * next bit (weight 2) is set when peer was primary */
2641                 *rule_nr = 40;
2642
2643                 switch (rct) {
2644                 case 0: /* !self_pri && !peer_pri */ return 0;
2645                 case 1: /*  self_pri && !peer_pri */ return 1;
2646                 case 2: /* !self_pri &&  peer_pri */ return -1;
2647                 case 3: /*  self_pri &&  peer_pri */
2648                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2649                         return dc ? -1 : 1;
2650                 }
2651         }
2652
2653         *rule_nr = 50;
2654         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2655         if (self == peer)
2656                 return -1;
2657
2658         *rule_nr = 51;
2659         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2660         if (self == peer) {
2661                 if (mdev->tconn->agreed_pro_version < 96 ?
2662                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2663                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2664                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2665                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2666                            resync as sync source modifications of the peer's UUIDs. */
2667
2668                         if (mdev->tconn->agreed_pro_version < 91)
2669                                 return -1091;
2670
2671                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2672                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2673
2674                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2675                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2676
2677                         return -1;
2678                 }
2679         }
2680
2681         *rule_nr = 60;
2682         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2683         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2684                 peer = mdev->p_uuid[i] & ~((u64)1);
2685                 if (self == peer)
2686                         return -2;
2687         }
2688
2689         *rule_nr = 70;
2690         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2691         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2692         if (self == peer)
2693                 return 1;
2694
2695         *rule_nr = 71;
2696         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2697         if (self == peer) {
2698                 if (mdev->tconn->agreed_pro_version < 96 ?
2699                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2700                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2701                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2702                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2703                            resync as sync source modifications of our UUIDs. */
2704
2705                         if (mdev->tconn->agreed_pro_version < 91)
2706                                 return -1091;
2707
2708                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2709                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2710
2711                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2712                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2713                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2714
2715                         return 1;
2716                 }
2717         }
2718
2719
2720         *rule_nr = 80;
2721         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2722         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2723                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2724                 if (self == peer)
2725                         return 2;
2726         }
2727
2728         *rule_nr = 90;
2729         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2730         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2731         if (self == peer && self != ((u64)0))
2732                 return 100;
2733
2734         *rule_nr = 100;
2735         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2736                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2737                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2738                         peer = mdev->p_uuid[j] & ~((u64)1);
2739                         if (self == peer)
2740                                 return -100;
2741                 }
2742         }
2743
2744         return -1000;
2745 }
2746
2747 /* drbd_sync_handshake() returns the new conn state on success, or
2748    CONN_MASK (-1) on failure.
2749  */
2750 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2751                                            enum drbd_disk_state peer_disk) __must_hold(local)
2752 {
2753         int hg, rule_nr;
2754         enum drbd_conns rv = C_MASK;
2755         enum drbd_disk_state mydisk;
2756
2757         mydisk = mdev->state.disk;
2758         if (mydisk == D_NEGOTIATING)
2759                 mydisk = mdev->new_state_tmp.disk;
2760
2761         dev_info(DEV, "drbd_sync_handshake:\n");
2762         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2763         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2764                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2765
2766         hg = drbd_uuid_compare(mdev, &rule_nr);
2767
2768         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2769
2770         if (hg == -1000) {
2771                 dev_alert(DEV, "Unrelated data, aborting!\n");
2772                 return C_MASK;
2773         }
2774         if (hg < -1000) {
2775                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2776                 return C_MASK;
2777         }
2778
2779         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2780             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2781                 int f = (hg == -100) || abs(hg) == 2;
2782                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2783                 if (f)
2784                         hg = hg*2;
2785                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2786                      hg > 0 ? "source" : "target");
2787         }
2788
2789         if (abs(hg) == 100)
2790                 drbd_khelper(mdev, "initial-split-brain");
2791
2792         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2793                 int pcount = (mdev->state.role == R_PRIMARY)
2794                            + (peer_role == R_PRIMARY);
2795                 int forced = (hg == -100);
2796
2797                 switch (pcount) {
2798                 case 0:
2799                         hg = drbd_asb_recover_0p(mdev);
2800                         break;
2801                 case 1:
2802                         hg = drbd_asb_recover_1p(mdev);
2803                         break;
2804                 case 2:
2805                         hg = drbd_asb_recover_2p(mdev);
2806                         break;
2807                 }
2808                 if (abs(hg) < 100) {
2809                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2810                              "automatically solved. Sync from %s node\n",
2811                              pcount, (hg < 0) ? "peer" : "this");
2812                         if (forced) {
2813                                 dev_warn(DEV, "Doing a full sync, since"
2814                                      " UUIDs where ambiguous.\n");
2815                                 hg = hg*2;
2816                         }
2817                 }
2818         }
2819
2820         if (hg == -100) {
2821                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2822                         hg = -1;
2823                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2824                         hg = 1;
2825
2826                 if (abs(hg) < 100)
2827                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2828                              "Sync from %s node\n",
2829                              (hg < 0) ? "peer" : "this");
2830         }
2831
2832         if (hg == -100) {
2833                 /* FIXME this log message is not correct if we end up here
2834                  * after an attempted attach on a diskless node.
2835                  * We just refuse to attach -- well, we drop the "connection"
2836                  * to that disk, in a way... */
2837                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2838                 drbd_khelper(mdev, "split-brain");
2839                 return C_MASK;
2840         }
2841
2842         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2843                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2844                 return C_MASK;
2845         }
2846
2847         if (hg < 0 && /* by intention we do not use mydisk here. */
2848             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2849                 switch (mdev->tconn->net_conf->rr_conflict) {
2850                 case ASB_CALL_HELPER:
2851                         drbd_khelper(mdev, "pri-lost");
2852                         /* fall through */
2853                 case ASB_DISCONNECT:
2854                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2855                         return C_MASK;
2856                 case ASB_VIOLENTLY:
2857                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2858                              "assumption\n");
2859                 }
2860         }
2861
2862         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2863                 if (hg == 0)
2864                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2865                 else
2866                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2867                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2868                                  abs(hg) >= 2 ? "full" : "bit-map based");
2869                 return C_MASK;
2870         }
2871
2872         if (abs(hg) >= 2) {
2873                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2874                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2875                                         BM_LOCKED_SET_ALLOWED))
2876                         return C_MASK;
2877         }
2878
2879         if (hg > 0) { /* become sync source. */
2880                 rv = C_WF_BITMAP_S;
2881         } else if (hg < 0) { /* become sync target */
2882                 rv = C_WF_BITMAP_T;
2883         } else {
2884                 rv = C_CONNECTED;
2885                 if (drbd_bm_total_weight(mdev)) {
2886                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2887                              drbd_bm_total_weight(mdev));
2888                 }
2889         }
2890
2891         return rv;
2892 }
2893
2894 /* returns 1 if invalid */
2895 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2896 {
2897         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2898         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2899             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2900                 return 0;
2901
2902         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2903         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2904             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2905                 return 1;
2906
2907         /* everything else is valid if they are equal on both sides. */
2908         if (peer == self)
2909                 return 0;
2910
2911         /* everything es is invalid. */
2912         return 1;
2913 }
2914
2915 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2916 {
2917         struct p_protocol *p = pi->data;
2918         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2919         int p_want_lose, p_two_primaries, cf;
2920         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2921
2922         p_proto         = be32_to_cpu(p->protocol);
2923         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2924         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2925         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2926         p_two_primaries = be32_to_cpu(p->two_primaries);
2927         cf              = be32_to_cpu(p->conn_flags);
2928         p_want_lose = cf & CF_WANT_LOSE;
2929
2930         clear_bit(CONN_DRY_RUN, &tconn->flags);
2931
2932         if (cf & CF_DRY_RUN)
2933                 set_bit(CONN_DRY_RUN, &tconn->flags);
2934
2935         if (p_proto != tconn->net_conf->wire_protocol) {
2936                 conn_err(tconn, "incompatible communication protocols\n");
2937                 goto disconnect;
2938         }
2939
2940         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2941                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2942                 goto disconnect;
2943         }
2944
2945         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2946                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2947                 goto disconnect;
2948         }
2949
2950         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2951                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2952                 goto disconnect;
2953         }
2954
2955         if (p_want_lose && tconn->net_conf->want_lose) {
2956                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2957                 goto disconnect;
2958         }
2959
2960         if (p_two_primaries != tconn->net_conf->two_primaries) {
2961                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2962                 goto disconnect;
2963         }
2964
2965         if (tconn->agreed_pro_version >= 87) {
2966                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2967                 int err;
2968
2969                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2970                 if (err)
2971                         return err;
2972
2973                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2974                 if (strcmp(p_integrity_alg, my_alg)) {
2975                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2976                         goto disconnect;
2977                 }
2978                 conn_info(tconn, "data-integrity-alg: %s\n",
2979                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2980         }
2981
2982         return 0;
2983
2984 disconnect:
2985         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2986         return -EIO;
2987 }
2988
2989 /* helper function
2990  * input: alg name, feature name
2991  * return: NULL (alg name was "")
2992  *         ERR_PTR(error) if something goes wrong
2993  *         or the crypto hash ptr, if it worked out ok. */
2994 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2995                 const char *alg, const char *name)
2996 {
2997         struct crypto_hash *tfm;
2998
2999         if (!alg[0])
3000                 return NULL;
3001
3002         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3003         if (IS_ERR(tfm)) {
3004                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3005                         alg, name, PTR_ERR(tfm));
3006                 return tfm;
3007         }
3008         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3009                 crypto_free_hash(tfm);
3010                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3011                 return ERR_PTR(-EINVAL);
3012         }
3013         return tfm;
3014 }
3015
3016 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3017 {
3018         void *buffer = tconn->data.rbuf;
3019         int size = pi->size;
3020
3021         while (size) {
3022                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3023                 s = drbd_recv(tconn, buffer, s);
3024                 if (s <= 0) {
3025                         if (s < 0)
3026                                 return s;
3027                         break;
3028                 }
3029                 size -= s;
3030         }
3031         if (size)
3032                 return -EIO;
3033         return 0;
3034 }
3035
3036 /*
3037  * config_unknown_volume  -  device configuration command for unknown volume
3038  *
3039  * When a device is added to an existing connection, the node on which the
3040  * device is added first will send configuration commands to its peer but the
3041  * peer will not know about the device yet.  It will warn and ignore these
3042  * commands.  Once the device is added on the second node, the second node will
3043  * send the same device configuration commands, but in the other direction.
3044  *
3045  * (We can also end up here if drbd is misconfigured.)
3046  */
3047 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3048 {
3049         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3050                   pi->vnr, cmdname(pi->cmd));
3051         return ignore_remaining_packet(tconn, pi);
3052 }
3053
3054 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3055 {
3056         struct drbd_conf *mdev;
3057         struct p_rs_param_95 *p;
3058         unsigned int header_size, data_size, exp_max_sz;
3059         struct crypto_hash *verify_tfm = NULL;
3060         struct crypto_hash *csums_tfm = NULL;
3061         const int apv = tconn->agreed_pro_version;
3062         int *rs_plan_s = NULL;
3063         int fifo_size = 0;
3064         int err;
3065
3066         mdev = vnr_to_mdev(tconn, pi->vnr);
3067         if (!mdev)
3068                 return config_unknown_volume(tconn, pi);
3069
3070         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3071                     : apv == 88 ? sizeof(struct p_rs_param)
3072                                         + SHARED_SECRET_MAX
3073                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3074                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3075
3076         if (pi->size > exp_max_sz) {
3077                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3078                     pi->size, exp_max_sz);
3079                 return -EIO;
3080         }
3081
3082         if (apv <= 88) {
3083                 header_size = sizeof(struct p_rs_param);
3084                 data_size = pi->size - header_size;
3085         } else if (apv <= 94) {
3086                 header_size = sizeof(struct p_rs_param_89);
3087                 data_size = pi->size - header_size;
3088                 D_ASSERT(data_size == 0);
3089         } else {
3090                 header_size = sizeof(struct p_rs_param_95);
3091                 data_size = pi->size - header_size;
3092                 D_ASSERT(data_size == 0);
3093         }
3094
3095         /* initialize verify_alg and csums_alg */
3096         p = pi->data;
3097         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3098
3099         err = drbd_recv_all(mdev->tconn, p, header_size);
3100         if (err)
3101                 return err;
3102
3103         if (get_ldev(mdev)) {
3104                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3105                 put_ldev(mdev);
3106         }
3107
3108         if (apv >= 88) {
3109                 if (apv == 88) {
3110                         if (data_size > SHARED_SECRET_MAX) {
3111                                 dev_err(DEV, "verify-alg too long, "
3112                                     "peer wants %u, accepting only %u byte\n",
3113                                                 data_size, SHARED_SECRET_MAX);
3114                                 return -EIO;
3115                         }
3116
3117                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3118                         if (err)
3119                                 return err;
3120
3121                         /* we expect NUL terminated string */
3122                         /* but just in case someone tries to be evil */
3123                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3124                         p->verify_alg[data_size-1] = 0;
3125
3126                 } else /* apv >= 89 */ {
3127                         /* we still expect NUL terminated strings */
3128                         /* but just in case someone tries to be evil */
3129                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3130                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3131                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3132                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3133                 }
3134
3135                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3136                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3137                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3138                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3139                                 goto disconnect;
3140                         }
3141                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3142                                         p->verify_alg, "verify-alg");
3143                         if (IS_ERR(verify_tfm)) {
3144                                 verify_tfm = NULL;
3145                                 goto disconnect;
3146                         }
3147                 }
3148
3149                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3150                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3151                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3152                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3153                                 goto disconnect;
3154                         }
3155                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3156                                         p->csums_alg, "csums-alg");
3157                         if (IS_ERR(csums_tfm)) {
3158                                 csums_tfm = NULL;
3159                                 goto disconnect;
3160                         }
3161                 }
3162
3163                 if (apv > 94 && get_ldev(mdev)) {
3164                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3165                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3166                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3167                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3168                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3169
3170                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3171                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3172                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3173                                 if (!rs_plan_s) {
3174                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3175                                         put_ldev(mdev);
3176                                         goto disconnect;
3177                                 }
3178                         }
3179                         put_ldev(mdev);
3180                 }
3181
3182                 spin_lock(&mdev->peer_seq_lock);
3183                 /* lock against drbd_nl_syncer_conf() */
3184                 if (verify_tfm) {
3185                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3186                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3187                         crypto_free_hash(mdev->tconn->verify_tfm);
3188                         mdev->tconn->verify_tfm = verify_tfm;
3189                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3190                 }
3191                 if (csums_tfm) {
3192                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3193                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3194                         crypto_free_hash(mdev->tconn->csums_tfm);
3195                         mdev->tconn->csums_tfm = csums_tfm;
3196                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3197                 }
3198                 if (fifo_size != mdev->rs_plan_s.size) {
3199                         kfree(mdev->rs_plan_s.values);
3200                         mdev->rs_plan_s.values = rs_plan_s;
3201                         mdev->rs_plan_s.size   = fifo_size;
3202                         mdev->rs_planed = 0;
3203                 }
3204                 spin_unlock(&mdev->peer_seq_lock);
3205         }
3206         return 0;
3207
3208 disconnect:
3209         /* just for completeness: actually not needed,
3210          * as this is not reached if csums_tfm was ok. */
3211         crypto_free_hash(csums_tfm);
3212         /* but free the verify_tfm again, if csums_tfm did not work out */
3213         crypto_free_hash(verify_tfm);
3214         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3215         return -EIO;
3216 }
3217
3218 /* warn if the arguments differ by more than 12.5% */
3219 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3220         const char *s, sector_t a, sector_t b)
3221 {
3222         sector_t d;
3223         if (a == 0 || b == 0)
3224                 return;
3225         d = (a > b) ? (a - b) : (b - a);
3226         if (d > (a>>3) || d > (b>>3))
3227                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3228                      (unsigned long long)a, (unsigned long long)b);
3229 }
3230
3231 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3232 {
3233         struct drbd_conf *mdev;
3234         struct p_sizes *p = pi->data;
3235         enum determine_dev_size dd = unchanged;
3236         sector_t p_size, p_usize, my_usize;
3237         int ldsc = 0; /* local disk size changed */
3238         enum dds_flags ddsf;
3239
3240         mdev = vnr_to_mdev(tconn, pi->vnr);
3241         if (!mdev)
3242                 return config_unknown_volume(tconn, pi);
3243
3244         p_size = be64_to_cpu(p->d_size);
3245         p_usize = be64_to_cpu(p->u_size);
3246
3247         /* just store the peer's disk size for now.
3248          * we still need to figure out whether we accept that. */
3249         mdev->p_size = p_size;
3250
3251         if (get_ldev(mdev)) {
3252                 warn_if_differ_considerably(mdev, "lower level device sizes",
3253                            p_size, drbd_get_max_capacity(mdev->ldev));
3254                 warn_if_differ_considerably(mdev, "user requested size",
3255                                             p_usize, mdev->ldev->dc.disk_size);
3256
3257                 /* if this is the first connect, or an otherwise expected
3258                  * param exchange, choose the minimum */
3259                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3260                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3261                                              p_usize);
3262
3263                 my_usize = mdev->ldev->dc.disk_size;
3264
3265                 if (mdev->ldev->dc.disk_size != p_usize) {
3266                         mdev->ldev->dc.disk_size = p_usize;
3267                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3268                              (unsigned long)mdev->ldev->dc.disk_size);
3269                 }
3270
3271                 /* Never shrink a device with usable data during connect.
3272                    But allow online shrinking if we are connected. */
3273                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3274                    drbd_get_capacity(mdev->this_bdev) &&
3275                    mdev->state.disk >= D_OUTDATED &&
3276                    mdev->state.conn < C_CONNECTED) {
3277                         dev_err(DEV, "The peer's disk size is too small!\n");
3278                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3279                         mdev->ldev->dc.disk_size = my_usize;
3280                         put_ldev(mdev);
3281                         return -EIO;
3282                 }
3283                 put_ldev(mdev);
3284         }
3285
3286         ddsf = be16_to_cpu(p->dds_flags);
3287         if (get_ldev(mdev)) {
3288                 dd = drbd_determine_dev_size(mdev, ddsf);
3289                 put_ldev(mdev);
3290                 if (dd == dev_size_error)
3291                         return -EIO;
3292                 drbd_md_sync(mdev);
3293         } else {
3294                 /* I am diskless, need to accept the peer's size. */
3295                 drbd_set_my_capacity(mdev, p_size);
3296         }
3297
3298         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3299         drbd_reconsider_max_bio_size(mdev);
3300
3301         if (get_ldev(mdev)) {
3302                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3303                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3304                         ldsc = 1;
3305                 }
3306
3307                 put_ldev(mdev);
3308         }
3309
3310         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3311                 if (be64_to_cpu(p->c_size) !=
3312                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3313                         /* we have different sizes, probably peer
3314                          * needs to know my new size... */
3315                         drbd_send_sizes(mdev, 0, ddsf);
3316                 }
3317                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3318                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3319                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3320                             mdev->state.disk >= D_INCONSISTENT) {
3321                                 if (ddsf & DDSF_NO_RESYNC)
3322                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3323                                 else
3324                                         resync_after_online_grow(mdev);
3325                         } else
3326                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3327                 }
3328         }
3329
3330         return 0;
3331 }
3332
3333 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3334 {
3335         struct drbd_conf *mdev;
3336         struct p_uuids *p = pi->data;
3337         u64 *p_uuid;
3338         int i, updated_uuids = 0;
3339
3340         mdev = vnr_to_mdev(tconn, pi->vnr);
3341         if (!mdev)
3342                 return config_unknown_volume(tconn, pi);
3343
3344         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3345
3346         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3347                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3348
3349         kfree(mdev->p_uuid);
3350         mdev->p_uuid = p_uuid;
3351
3352         if (mdev->state.conn < C_CONNECTED &&
3353             mdev->state.disk < D_INCONSISTENT &&
3354             mdev->state.role == R_PRIMARY &&
3355             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3356                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3357                     (unsigned long long)mdev->ed_uuid);
3358                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3359                 return -EIO;
3360         }
3361
3362         if (get_ldev(mdev)) {
3363                 int skip_initial_sync =
3364                         mdev->state.conn == C_CONNECTED &&
3365                         mdev->tconn->agreed_pro_version >= 90 &&
3366                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3367                         (p_uuid[UI_FLAGS] & 8);
3368                 if (skip_initial_sync) {
3369                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3370                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3371                                         "clear_n_write from receive_uuids",
3372                                         BM_LOCKED_TEST_ALLOWED);
3373                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3374                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3375                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3376                                         CS_VERBOSE, NULL);
3377                         drbd_md_sync(mdev);
3378                         updated_uuids = 1;
3379                 }
3380                 put_ldev(mdev);
3381         } else if (mdev->state.disk < D_INCONSISTENT &&
3382                    mdev->state.role == R_PRIMARY) {
3383                 /* I am a diskless primary, the peer just created a new current UUID
3384                    for me. */
3385                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3386         }
3387
3388         /* Before we test for the disk state, we should wait until an eventually
3389            ongoing cluster wide state change is finished. That is important if
3390            we are primary and are detaching from our disk. We need to see the
3391            new disk state... */
3392         mutex_lock(mdev->state_mutex);
3393         mutex_unlock(mdev->state_mutex);
3394         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3395                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3396
3397         if (updated_uuids)
3398                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3399
3400         return 0;
3401 }
3402
3403 /**
3404  * convert_state() - Converts the peer's view of the cluster state to our point of view
3405  * @ps:         The state as seen by the peer.
3406  */
3407 static union drbd_state convert_state(union drbd_state ps)
3408 {
3409         union drbd_state ms;
3410
3411         static enum drbd_conns c_tab[] = {
3412                 [C_CONNECTED] = C_CONNECTED,
3413
3414                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3415                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3416                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3417                 [C_VERIFY_S]       = C_VERIFY_T,
3418                 [C_MASK]   = C_MASK,
3419         };
3420
3421         ms.i = ps.i;
3422
3423         ms.conn = c_tab[ps.conn];
3424         ms.peer = ps.role;
3425         ms.role = ps.peer;
3426         ms.pdsk = ps.disk;
3427         ms.disk = ps.pdsk;
3428         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3429
3430         return ms;
3431 }
3432
3433 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3434 {
3435         struct drbd_conf *mdev;
3436         struct p_req_state *p = pi->data;
3437         union drbd_state mask, val;
3438         enum drbd_state_rv rv;
3439
3440         mdev = vnr_to_mdev(tconn, pi->vnr);
3441         if (!mdev)
3442                 return -EIO;
3443
3444         mask.i = be32_to_cpu(p->mask);
3445         val.i = be32_to_cpu(p->val);
3446
3447         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3448             mutex_is_locked(mdev->state_mutex)) {
3449                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3450                 return 0;
3451         }
3452
3453         mask = convert_state(mask);
3454         val = convert_state(val);
3455
3456         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3457         drbd_send_sr_reply(mdev, rv);
3458
3459         drbd_md_sync(mdev);
3460
3461         return 0;
3462 }
3463
3464 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3465 {
3466         struct p_req_state *p = pi->data;
3467         union drbd_state mask, val;
3468         enum drbd_state_rv rv;
3469
3470         mask.i = be32_to_cpu(p->mask);
3471         val.i = be32_to_cpu(p->val);
3472
3473         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3474             mutex_is_locked(&tconn->cstate_mutex)) {
3475                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3476                 return 0;
3477         }
3478
3479         mask = convert_state(mask);
3480         val = convert_state(val);
3481
3482         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3483         conn_send_sr_reply(tconn, rv);
3484
3485         return 0;
3486 }
3487
3488 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3489 {
3490         struct drbd_conf *mdev;
3491         struct p_state *p = pi->data;
3492         union drbd_state os, ns, peer_state;
3493         enum drbd_disk_state real_peer_disk;
3494         enum chg_state_flags cs_flags;
3495         int rv;
3496
3497         mdev = vnr_to_mdev(tconn, pi->vnr);
3498         if (!mdev)
3499                 return config_unknown_volume(tconn, pi);
3500
3501         peer_state.i = be32_to_cpu(p->state);
3502
3503         real_peer_disk = peer_state.disk;
3504         if (peer_state.disk == D_NEGOTIATING) {
3505                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3506                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3507         }
3508
3509         spin_lock_irq(&mdev->tconn->req_lock);
3510  retry:
3511         os = ns = drbd_read_state(mdev);
3512         spin_unlock_irq(&mdev->tconn->req_lock);
3513
3514         /* peer says his disk is uptodate, while we think it is inconsistent,
3515          * and this happens while we think we have a sync going on. */
3516         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3517             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3518                 /* If we are (becoming) SyncSource, but peer is still in sync
3519                  * preparation, ignore its uptodate-ness to avoid flapping, it
3520                  * will change to inconsistent once the peer reaches active
3521                  * syncing states.
3522                  * It may have changed syncer-paused flags, however, so we
3523                  * cannot ignore this completely. */
3524                 if (peer_state.conn > C_CONNECTED &&
3525                     peer_state.conn < C_SYNC_SOURCE)
3526                         real_peer_disk = D_INCONSISTENT;
3527
3528                 /* if peer_state changes to connected at the same time,
3529                  * it explicitly notifies us that it finished resync.
3530                  * Maybe we should finish it up, too? */
3531                 else if (os.conn >= C_SYNC_SOURCE &&
3532                          peer_state.conn == C_CONNECTED) {
3533                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3534                                 drbd_resync_finished(mdev);
3535                         return 0;
3536                 }
3537         }
3538
3539         /* peer says his disk is inconsistent, while we think it is uptodate,
3540          * and this happens while the peer still thinks we have a sync going on,
3541          * but we think we are already done with the sync.
3542          * We ignore this to avoid flapping pdsk.
3543          * This should not happen, if the peer is a recent version of drbd. */
3544         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3545             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3546                 real_peer_disk = D_UP_TO_DATE;
3547
3548         if (ns.conn == C_WF_REPORT_PARAMS)
3549                 ns.conn = C_CONNECTED;
3550
3551         if (peer_state.conn == C_AHEAD)
3552                 ns.conn = C_BEHIND;
3553
3554         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3555             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3556                 int cr; /* consider resync */
3557
3558                 /* if we established a new connection */
3559                 cr  = (os.conn < C_CONNECTED);
3560                 /* if we had an established connection
3561                  * and one of the nodes newly attaches a disk */
3562                 cr |= (os.conn == C_CONNECTED &&
3563                        (peer_state.disk == D_NEGOTIATING ||
3564                         os.disk == D_NEGOTIATING));
3565                 /* if we have both been inconsistent, and the peer has been
3566                  * forced to be UpToDate with --overwrite-data */
3567                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3568                 /* if we had been plain connected, and the admin requested to
3569                  * start a sync by "invalidate" or "invalidate-remote" */
3570                 cr |= (os.conn == C_CONNECTED &&
3571                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3572                                  peer_state.conn <= C_WF_BITMAP_T));
3573
3574                 if (cr)
3575                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3576
3577                 put_ldev(mdev);
3578                 if (ns.conn == C_MASK) {
3579                         ns.conn = C_CONNECTED;
3580                         if (mdev->state.disk == D_NEGOTIATING) {
3581                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3582                         } else if (peer_state.disk == D_NEGOTIATING) {
3583                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3584                                 peer_state.disk = D_DISKLESS;
3585                                 real_peer_disk = D_DISKLESS;
3586                         } else {
3587                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3588                                         return -EIO;
3589                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3590                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3591                                 return -EIO;
3592                         }
3593                 }
3594         }
3595
3596         spin_lock_irq(&mdev->tconn->req_lock);
3597         if (os.i != drbd_read_state(mdev).i)
3598                 goto retry;
3599         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3600         ns.peer = peer_state.role;
3601         ns.pdsk = real_peer_disk;
3602         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3603         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3604                 ns.disk = mdev->new_state_tmp.disk;
3605         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3606         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3607             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3608                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3609                    for temporal network outages! */
3610                 spin_unlock_irq(&mdev->tconn->req_lock);
3611                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3612                 tl_clear(mdev->tconn);
3613                 drbd_uuid_new_current(mdev);
3614                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3615                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3616                 return -EIO;
3617         }
3618         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3619         ns = drbd_read_state(mdev);
3620         spin_unlock_irq(&mdev->tconn->req_lock);
3621
3622         if (rv < SS_SUCCESS) {
3623                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3624                 return -EIO;
3625         }
3626
3627         if (os.conn > C_WF_REPORT_PARAMS) {
3628                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3629                     peer_state.disk != D_NEGOTIATING ) {
3630                         /* we want resync, peer has not yet decided to sync... */
3631                         /* Nowadays only used when forcing a node into primary role and
3632                            setting its disk to UpToDate with that */
3633                         drbd_send_uuids(mdev);
3634                         drbd_send_state(mdev);
3635                 }
3636         }
3637
3638         mdev->tconn->net_conf->want_lose = 0;
3639
3640         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3641
3642         return 0;
3643 }
3644
3645 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3646 {
3647         struct drbd_conf *mdev;
3648         struct p_rs_uuid *p = pi->data;
3649
3650         mdev = vnr_to_mdev(tconn, pi->vnr);
3651         if (!mdev)
3652                 return -EIO;
3653
3654         wait_event(mdev->misc_wait,
3655                    mdev->state.conn == C_WF_SYNC_UUID ||
3656                    mdev->state.conn == C_BEHIND ||
3657                    mdev->state.conn < C_CONNECTED ||
3658                    mdev->state.disk < D_NEGOTIATING);
3659
3660         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3661
3662         /* Here the _drbd_uuid_ functions are right, current should
3663            _not_ be rotated into the history */
3664         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3665                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3666                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3667
3668                 drbd_print_uuids(mdev, "updated sync uuid");
3669                 drbd_start_resync(mdev, C_SYNC_TARGET);
3670
3671                 put_ldev(mdev);
3672         } else
3673                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3674
3675         return 0;
3676 }
3677
3678 /**
3679  * receive_bitmap_plain
3680  *
3681  * Return 0 when done, 1 when another iteration is needed, and a negative error
3682  * code upon failure.
3683  */
3684 static int
3685 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3686                      unsigned long *p, struct bm_xfer_ctx *c)
3687 {
3688         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3689                                  drbd_header_size(mdev->tconn);
3690         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3691                                        c->bm_words - c->word_offset);
3692         unsigned int want = num_words * sizeof(*p);
3693         int err;
3694
3695         if (want != size) {
3696                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3697                 return -EIO;
3698         }
3699         if (want == 0)
3700                 return 0;
3701         err = drbd_recv_all(mdev->tconn, p, want);
3702         if (err)
3703                 return err;
3704
3705         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3706
3707         c->word_offset += num_words;
3708         c->bit_offset = c->word_offset * BITS_PER_LONG;
3709         if (c->bit_offset > c->bm_bits)
3710                 c->bit_offset = c->bm_bits;
3711
3712         return 1;
3713 }
3714
3715 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3716 {
3717         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3718 }
3719
3720 static int dcbp_get_start(struct p_compressed_bm *p)
3721 {
3722         return (p->encoding & 0x80) != 0;
3723 }
3724
3725 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3726 {
3727         return (p->encoding >> 4) & 0x7;
3728 }
3729
3730 /**
3731  * recv_bm_rle_bits
3732  *
3733  * Return 0 when done, 1 when another iteration is needed, and a negative error
3734  * code upon failure.
3735  */
3736 static int
3737 recv_bm_rle_bits(struct drbd_conf *mdev,
3738                 struct p_compressed_bm *p,
3739                  struct bm_xfer_ctx *c,
3740                  unsigned int len)
3741 {
3742         struct bitstream bs;
3743         u64 look_ahead;
3744         u64 rl;
3745         u64 tmp;
3746         unsigned long s = c->bit_offset;
3747         unsigned long e;
3748         int toggle = dcbp_get_start(p);
3749         int have;
3750         int bits;
3751
3752         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3753
3754         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3755         if (bits < 0)
3756                 return -EIO;
3757
3758         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3759                 bits = vli_decode_bits(&rl, look_ahead);
3760                 if (bits <= 0)
3761                         return -EIO;
3762
3763                 if (toggle) {
3764                         e = s + rl -1;
3765                         if (e >= c->bm_bits) {
3766                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3767                                 return -EIO;
3768                         }
3769                         _drbd_bm_set_bits(mdev, s, e);
3770                 }
3771
3772                 if (have < bits) {
3773                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3774                                 have, bits, look_ahead,
3775                                 (unsigned int)(bs.cur.b - p->code),
3776                                 (unsigned int)bs.buf_len);
3777                         return -EIO;
3778                 }
3779                 look_ahead >>= bits;
3780                 have -= bits;
3781
3782                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3783                 if (bits < 0)
3784                         return -EIO;
3785                 look_ahead |= tmp << have;
3786                 have += bits;
3787         }
3788
3789         c->bit_offset = s;
3790         bm_xfer_ctx_bit_to_word_offset(c);
3791
3792         return (s != c->bm_bits);
3793 }
3794
3795 /**
3796  * decode_bitmap_c
3797  *
3798  * Return 0 when done, 1 when another iteration is needed, and a negative error
3799  * code upon failure.
3800  */
3801 static int
3802 decode_bitmap_c(struct drbd_conf *mdev,
3803                 struct p_compressed_bm *p,
3804                 struct bm_xfer_ctx *c,
3805                 unsigned int len)
3806 {
3807         if (dcbp_get_code(p) == RLE_VLI_Bits)
3808                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3809
3810         /* other variants had been implemented for evaluation,
3811          * but have been dropped as this one turned out to be "best"
3812          * during all our tests. */
3813
3814         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3815         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3816         return -EIO;
3817 }
3818
3819 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3820                 const char *direction, struct bm_xfer_ctx *c)
3821 {
3822         /* what would it take to transfer it "plaintext" */
3823         unsigned int header_size = drbd_header_size(mdev->tconn);
3824         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3825         unsigned int plain =
3826                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3827                 c->bm_words * sizeof(unsigned long);
3828         unsigned int total = c->bytes[0] + c->bytes[1];
3829         unsigned int r;
3830
3831         /* total can not be zero. but just in case: */
3832         if (total == 0)
3833                 return;
3834
3835         /* don't report if not compressed */
3836         if (total >= plain)
3837                 return;
3838
3839         /* total < plain. check for overflow, still */
3840         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3841                                     : (1000 * total / plain);
3842
3843         if (r > 1000)
3844                 r = 1000;
3845
3846         r = 1000 - r;
3847         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3848              "total %u; compression: %u.%u%%\n",
3849                         direction,
3850                         c->bytes[1], c->packets[1],
3851                         c->bytes[0], c->packets[0],
3852                         total, r/10, r % 10);
3853 }
3854
3855 /* Since we are processing the bitfield from lower addresses to higher,
3856    it does not matter if the process it in 32 bit chunks or 64 bit
3857    chunks as long as it is little endian. (Understand it as byte stream,
3858    beginning with the lowest byte...) If we would use big endian
3859    we would need to process it from the highest address to the lowest,
3860    in order to be agnostic to the 32 vs 64 bits issue.
3861
3862    returns 0 on failure, 1 if we successfully received it. */
3863 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3864 {
3865         struct drbd_conf *mdev;
3866         struct bm_xfer_ctx c;
3867         int err;
3868
3869         mdev = vnr_to_mdev(tconn, pi->vnr);
3870         if (!mdev)
3871                 return -EIO;
3872
3873         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3874         /* you are supposed to send additional out-of-sync information
3875          * if you actually set bits during this phase */
3876
3877         c = (struct bm_xfer_ctx) {
3878                 .bm_bits = drbd_bm_bits(mdev),
3879                 .bm_words = drbd_bm_words(mdev),
3880         };
3881
3882         for(;;) {
3883                 if (pi->cmd == P_BITMAP)
3884                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3885                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3886                         /* MAYBE: sanity check that we speak proto >= 90,
3887                          * and the feature is enabled! */
3888                         struct p_compressed_bm *p = pi->data;
3889
3890                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3891                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3892                                 err = -EIO;
3893                                 goto out;
3894                         }
3895                         if (pi->size <= sizeof(*p)) {
3896                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3897                                 err = -EIO;
3898                                 goto out;
3899                         }
3900                         err = drbd_recv_all(mdev->tconn, p, pi->size);
3901                         if (err)
3902                                goto out;
3903                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3904                 } else {
3905                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3906                         err = -EIO;
3907                         goto out;
3908                 }
3909
3910                 c.packets[pi->cmd == P_BITMAP]++;
3911                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3912
3913                 if (err <= 0) {
3914                         if (err < 0)
3915                                 goto out;
3916                         break;
3917                 }
3918                 err = drbd_recv_header(mdev->tconn, pi);
3919                 if (err)
3920                         goto out;
3921         }
3922
3923         INFO_bm_xfer_stats(mdev, "receive", &c);
3924
3925         if (mdev->state.conn == C_WF_BITMAP_T) {
3926                 enum drbd_state_rv rv;
3927
3928                 err = drbd_send_bitmap(mdev);
3929                 if (err)
3930                         goto out;
3931                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3932                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3933                 D_ASSERT(rv == SS_SUCCESS);
3934         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3935                 /* admin may have requested C_DISCONNECTING,
3936                  * other threads may have noticed network errors */
3937                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3938                     drbd_conn_str(mdev->state.conn));
3939         }
3940         err = 0;
3941
3942  out:
3943         drbd_bm_unlock(mdev);
3944         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3945                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3946         return err;
3947 }
3948
3949 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3950 {
3951         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3952                  pi->cmd, pi->size);
3953
3954         return ignore_remaining_packet(tconn, pi);
3955 }
3956
3957 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3958 {
3959         /* Make sure we've acked all the TCP data associated
3960          * with the data requests being unplugged */
3961         drbd_tcp_quickack(tconn->data.socket);
3962
3963         return 0;
3964 }
3965
3966 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3967 {
3968         struct drbd_conf *mdev;
3969         struct p_block_desc *p = pi->data;
3970
3971         mdev = vnr_to_mdev(tconn, pi->vnr);
3972         if (!mdev)
3973                 return -EIO;
3974
3975         switch (mdev->state.conn) {
3976         case C_WF_SYNC_UUID:
3977         case C_WF_BITMAP_T:
3978         case C_BEHIND:
3979                         break;
3980         default:
3981                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3982                                 drbd_conn_str(mdev->state.conn));
3983         }
3984
3985         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3986
3987         return 0;
3988 }
3989
3990 struct data_cmd {
3991         int expect_payload;
3992         size_t pkt_size;
3993         int (*fn)(struct drbd_tconn *, struct packet_info *);
3994 };
3995
3996 static struct data_cmd drbd_cmd_handler[] = {
3997         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3998         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3999         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4000         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4001         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4002         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4003         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4004         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4005         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4006         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4007         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4008         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4009         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4010         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4011         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4012         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4013         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4014         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4015         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4016         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4017         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4018         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4019         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4020 };
4021
4022 static void drbdd(struct drbd_tconn *tconn)
4023 {
4024         struct packet_info pi;
4025         size_t shs; /* sub header size */
4026         int err;
4027
4028         while (get_t_state(&tconn->receiver) == RUNNING) {
4029                 struct data_cmd *cmd;
4030
4031                 drbd_thread_current_set_cpu(&tconn->receiver);
4032                 if (drbd_recv_header(tconn, &pi))
4033                         goto err_out;
4034
4035                 cmd = &drbd_cmd_handler[pi.cmd];
4036                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4037                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4038                         goto err_out;
4039                 }
4040
4041                 shs = cmd->pkt_size;
4042                 if (pi.size > shs && !cmd->expect_payload) {
4043                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4044                         goto err_out;
4045                 }
4046
4047                 if (shs) {
4048                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4049                         if (err)
4050                                 goto err_out;
4051                         pi.size -= shs;
4052                 }
4053
4054                 err = cmd->fn(tconn, &pi);
4055                 if (err) {
4056                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4057                                  cmdname(pi.cmd), err, pi.size);
4058                         goto err_out;
4059                 }
4060         }
4061         return;
4062
4063     err_out:
4064         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4065 }
4066
4067 void conn_flush_workqueue(struct drbd_tconn *tconn)
4068 {
4069         struct drbd_wq_barrier barr;
4070
4071         barr.w.cb = w_prev_work_done;
4072         barr.w.tconn = tconn;
4073         init_completion(&barr.done);
4074         drbd_queue_work(&tconn->data.work, &barr.w);
4075         wait_for_completion(&barr.done);
4076 }
4077
4078 static void drbd_disconnect(struct drbd_tconn *tconn)
4079 {
4080         enum drbd_conns oc;
4081         int rv = SS_UNKNOWN_ERROR;
4082
4083         if (tconn->cstate == C_STANDALONE)
4084                 return;
4085
4086         /* asender does not clean up anything. it must not interfere, either */
4087         drbd_thread_stop(&tconn->asender);
4088         drbd_free_sock(tconn);
4089
4090         down_read(&drbd_cfg_rwsem);
4091         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4092         up_read(&drbd_cfg_rwsem);
4093         conn_info(tconn, "Connection closed\n");
4094
4095         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4096                 conn_try_outdate_peer_async(tconn);
4097
4098         spin_lock_irq(&tconn->req_lock);
4099         oc = tconn->cstate;
4100         if (oc >= C_UNCONNECTED)
4101                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4102
4103         spin_unlock_irq(&tconn->req_lock);
4104
4105         if (oc == C_DISCONNECTING) {
4106                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4107
4108                 crypto_free_hash(tconn->cram_hmac_tfm);
4109                 tconn->cram_hmac_tfm = NULL;
4110
4111                 kfree(tconn->net_conf);
4112                 tconn->net_conf = NULL;
4113                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4114         }
4115 }
4116
4117 static int drbd_disconnected(int vnr, void *p, void *data)
4118 {
4119         struct drbd_conf *mdev = (struct drbd_conf *)p;
4120         enum drbd_fencing_p fp;
4121         unsigned int i;
4122
4123         /* wait for current activity to cease. */
4124         spin_lock_irq(&mdev->tconn->req_lock);
4125         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4126         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4127         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4128         spin_unlock_irq(&mdev->tconn->req_lock);
4129
4130         /* We do not have data structures that would allow us to
4131          * get the rs_pending_cnt down to 0 again.
4132          *  * On C_SYNC_TARGET we do not have any data structures describing
4133          *    the pending RSDataRequest's we have sent.
4134          *  * On C_SYNC_SOURCE there is no data structure that tracks
4135          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4136          *  And no, it is not the sum of the reference counts in the
4137          *  resync_LRU. The resync_LRU tracks the whole operation including
4138          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4139          *  on the fly. */
4140         drbd_rs_cancel_all(mdev);
4141         mdev->rs_total = 0;
4142         mdev->rs_failed = 0;
4143         atomic_set(&mdev->rs_pending_cnt, 0);
4144         wake_up(&mdev->misc_wait);
4145
4146         del_timer(&mdev->request_timer);
4147
4148         del_timer_sync(&mdev->resync_timer);
4149         resync_timer_fn((unsigned long)mdev);
4150
4151         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4152          * w_make_resync_request etc. which may still be on the worker queue
4153          * to be "canceled" */
4154         drbd_flush_workqueue(mdev);
4155
4156         drbd_finish_peer_reqs(mdev);
4157
4158         kfree(mdev->p_uuid);
4159         mdev->p_uuid = NULL;
4160
4161         if (!drbd_suspended(mdev))
4162                 tl_clear(mdev->tconn);
4163
4164         drbd_md_sync(mdev);
4165
4166         fp = FP_DONT_CARE;
4167         if (get_ldev(mdev)) {
4168                 fp = mdev->ldev->dc.fencing;
4169                 put_ldev(mdev);
4170         }
4171
4172         /* serialize with bitmap writeout triggered by the state change,
4173          * if any. */
4174         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4175
4176         /* tcp_close and release of sendpage pages can be deferred.  I don't
4177          * want to use SO_LINGER, because apparently it can be deferred for
4178          * more than 20 seconds (longest time I checked).
4179          *
4180          * Actually we don't care for exactly when the network stack does its
4181          * put_page(), but release our reference on these pages right here.
4182          */
4183         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4184         if (i)
4185                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4186         i = atomic_read(&mdev->pp_in_use_by_net);
4187         if (i)
4188                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4189         i = atomic_read(&mdev->pp_in_use);
4190         if (i)
4191                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4192
4193         D_ASSERT(list_empty(&mdev->read_ee));
4194         D_ASSERT(list_empty(&mdev->active_ee));
4195         D_ASSERT(list_empty(&mdev->sync_ee));
4196         D_ASSERT(list_empty(&mdev->done_ee));
4197
4198         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4199         atomic_set(&mdev->current_epoch->epoch_size, 0);
4200         D_ASSERT(list_empty(&mdev->current_epoch->list));
4201
4202         return 0;
4203 }
4204
4205 /*
4206  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4207  * we can agree on is stored in agreed_pro_version.
4208  *
4209  * feature flags and the reserved array should be enough room for future
4210  * enhancements of the handshake protocol, and possible plugins...
4211  *
4212  * for now, they are expected to be zero, but ignored.
4213  */
4214 static int drbd_send_features(struct drbd_tconn *tconn)
4215 {
4216         struct drbd_socket *sock;
4217         struct p_connection_features *p;
4218
4219         sock = &tconn->data;
4220         p = conn_prepare_command(tconn, sock);
4221         if (!p)
4222                 return -EIO;
4223         memset(p, 0, sizeof(*p));
4224         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4225         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4226         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4227 }
4228
4229 /*
4230  * return values:
4231  *   1 yes, we have a valid connection
4232  *   0 oops, did not work out, please try again
4233  *  -1 peer talks different language,
4234  *     no point in trying again, please go standalone.
4235  */
4236 static int drbd_do_features(struct drbd_tconn *tconn)
4237 {
4238         /* ASSERT current == tconn->receiver ... */
4239         struct p_connection_features *p;
4240         const int expect = sizeof(struct p_connection_features);
4241         struct packet_info pi;
4242         int err;
4243
4244         err = drbd_send_features(tconn);
4245         if (err)
4246                 return 0;
4247
4248         err = drbd_recv_header(tconn, &pi);
4249         if (err)
4250                 return 0;
4251
4252         if (pi.cmd != P_CONNECTION_FEATURES) {
4253                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4254                      cmdname(pi.cmd), pi.cmd);
4255                 return -1;
4256         }
4257
4258         if (pi.size != expect) {
4259                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4260                      expect, pi.size);
4261                 return -1;
4262         }
4263
4264         p = pi.data;
4265         err = drbd_recv_all_warn(tconn, p, expect);
4266         if (err)
4267                 return 0;
4268
4269         p->protocol_min = be32_to_cpu(p->protocol_min);
4270         p->protocol_max = be32_to_cpu(p->protocol_max);
4271         if (p->protocol_max == 0)
4272                 p->protocol_max = p->protocol_min;
4273
4274         if (PRO_VERSION_MAX < p->protocol_min ||
4275             PRO_VERSION_MIN > p->protocol_max)
4276                 goto incompat;
4277
4278         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4279
4280         conn_info(tconn, "Handshake successful: "
4281              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4282
4283         return 1;
4284
4285  incompat:
4286         conn_err(tconn, "incompatible DRBD dialects: "
4287             "I support %d-%d, peer supports %d-%d\n",
4288             PRO_VERSION_MIN, PRO_VERSION_MAX,
4289             p->protocol_min, p->protocol_max);
4290         return -1;
4291 }
4292
4293 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4294 static int drbd_do_auth(struct drbd_tconn *tconn)
4295 {
4296         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4297         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4298         return -1;
4299 }
4300 #else
4301 #define CHALLENGE_LEN 64
4302
4303 /* Return value:
4304         1 - auth succeeded,
4305         0 - failed, try again (network error),
4306         -1 - auth failed, don't try again.
4307 */
4308
4309 static int drbd_do_auth(struct drbd_tconn *tconn)
4310 {
4311         struct drbd_socket *sock;
4312         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4313         struct scatterlist sg;
4314         char *response = NULL;
4315         char *right_response = NULL;
4316         char *peers_ch = NULL;
4317         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4318         unsigned int resp_size;
4319         struct hash_desc desc;
4320         struct packet_info pi;
4321         int err, rv;
4322
4323         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4324
4325         desc.tfm = tconn->cram_hmac_tfm;
4326         desc.flags = 0;
4327
4328         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4329                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4330         if (rv) {
4331                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4332                 rv = -1;
4333                 goto fail;
4334         }
4335
4336         get_random_bytes(my_challenge, CHALLENGE_LEN);
4337
4338         sock = &tconn->data;
4339         if (!conn_prepare_command(tconn, sock)) {
4340                 rv = 0;
4341                 goto fail;
4342         }
4343         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4344                                 my_challenge, CHALLENGE_LEN);
4345         if (!rv)
4346                 goto fail;
4347
4348         err = drbd_recv_header(tconn, &pi);
4349         if (err) {
4350                 rv = 0;
4351                 goto fail;
4352         }
4353
4354         if (pi.cmd != P_AUTH_CHALLENGE) {
4355                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4356                     cmdname(pi.cmd), pi.cmd);
4357                 rv = 0;
4358                 goto fail;
4359         }
4360
4361         if (pi.size > CHALLENGE_LEN * 2) {
4362                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4363                 rv = -1;
4364                 goto fail;
4365         }
4366
4367         peers_ch = kmalloc(pi.size, GFP_NOIO);
4368         if (peers_ch == NULL) {
4369                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4370                 rv = -1;
4371                 goto fail;
4372         }
4373
4374         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4375         if (err) {
4376                 rv = 0;
4377                 goto fail;
4378         }
4379
4380         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4381         response = kmalloc(resp_size, GFP_NOIO);
4382         if (response == NULL) {
4383                 conn_err(tconn, "kmalloc of response failed\n");
4384                 rv = -1;
4385                 goto fail;
4386         }
4387
4388         sg_init_table(&sg, 1);
4389         sg_set_buf(&sg, peers_ch, pi.size);
4390
4391         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4392         if (rv) {
4393                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4394                 rv = -1;
4395                 goto fail;
4396         }
4397
4398         if (!conn_prepare_command(tconn, sock)) {
4399                 rv = 0;
4400                 goto fail;
4401         }
4402         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4403                                 response, resp_size);
4404         if (!rv)
4405                 goto fail;
4406
4407         err = drbd_recv_header(tconn, &pi);
4408         if (err) {
4409                 rv = 0;
4410                 goto fail;
4411         }
4412
4413         if (pi.cmd != P_AUTH_RESPONSE) {
4414                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4415                         cmdname(pi.cmd), pi.cmd);
4416                 rv = 0;
4417                 goto fail;
4418         }
4419
4420         if (pi.size != resp_size) {
4421                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4422                 rv = 0;
4423                 goto fail;
4424         }
4425
4426         err = drbd_recv_all_warn(tconn, response , resp_size);
4427         if (err) {
4428                 rv = 0;
4429                 goto fail;
4430         }
4431
4432         right_response = kmalloc(resp_size, GFP_NOIO);
4433         if (right_response == NULL) {
4434                 conn_err(tconn, "kmalloc of right_response failed\n");
4435                 rv = -1;
4436                 goto fail;
4437         }
4438
4439         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4440
4441         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4442         if (rv) {
4443                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4444                 rv = -1;
4445                 goto fail;
4446         }
4447
4448         rv = !memcmp(response, right_response, resp_size);
4449
4450         if (rv)
4451                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4452                      resp_size, tconn->net_conf->cram_hmac_alg);
4453         else
4454                 rv = -1;
4455
4456  fail:
4457         kfree(peers_ch);
4458         kfree(response);
4459         kfree(right_response);
4460
4461         return rv;
4462 }
4463 #endif
4464
4465 int drbdd_init(struct drbd_thread *thi)
4466 {
4467         struct drbd_tconn *tconn = thi->tconn;
4468         int h;
4469
4470         conn_info(tconn, "receiver (re)started\n");
4471
4472         do {
4473                 h = drbd_connect(tconn);
4474                 if (h == 0) {
4475                         drbd_disconnect(tconn);
4476                         schedule_timeout_interruptible(HZ);
4477                 }
4478                 if (h == -1) {
4479                         conn_warn(tconn, "Discarding network configuration.\n");
4480                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4481                 }
4482         } while (h == 0);
4483
4484         if (h > 0) {
4485                 if (get_net_conf(tconn)) {
4486                         drbdd(tconn);
4487                         put_net_conf(tconn);
4488                 }
4489         }
4490
4491         drbd_disconnect(tconn);
4492
4493         conn_info(tconn, "receiver terminated\n");
4494         return 0;
4495 }
4496
4497 /* ********* acknowledge sender ******** */
4498
4499 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4500 {
4501         struct p_req_state_reply *p = pi->data;
4502         int retcode = be32_to_cpu(p->retcode);
4503
4504         if (retcode >= SS_SUCCESS) {
4505                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4506         } else {
4507                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4508                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4509                          drbd_set_st_err_str(retcode), retcode);
4510         }
4511         wake_up(&tconn->ping_wait);
4512
4513         return 0;
4514 }
4515
4516 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4517 {
4518         struct drbd_conf *mdev;
4519         struct p_req_state_reply *p = pi->data;
4520         int retcode = be32_to_cpu(p->retcode);
4521
4522         mdev = vnr_to_mdev(tconn, pi->vnr);
4523         if (!mdev)
4524                 return -EIO;
4525
4526         if (retcode >= SS_SUCCESS) {
4527                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4528         } else {
4529                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4530                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4531                         drbd_set_st_err_str(retcode), retcode);
4532         }
4533         wake_up(&mdev->state_wait);
4534
4535         return 0;
4536 }
4537
4538 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4539 {
4540         return drbd_send_ping_ack(tconn);
4541
4542 }
4543
4544 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4545 {
4546         /* restore idle timeout */
4547         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4548         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4549                 wake_up(&tconn->ping_wait);
4550
4551         return 0;
4552 }
4553
4554 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4555 {
4556         struct drbd_conf *mdev;
4557         struct p_block_ack *p = pi->data;
4558         sector_t sector = be64_to_cpu(p->sector);
4559         int blksize = be32_to_cpu(p->blksize);
4560
4561         mdev = vnr_to_mdev(tconn, pi->vnr);
4562         if (!mdev)
4563                 return -EIO;
4564
4565         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4566
4567         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4568
4569         if (get_ldev(mdev)) {
4570                 drbd_rs_complete_io(mdev, sector);
4571                 drbd_set_in_sync(mdev, sector, blksize);
4572                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4573                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4574                 put_ldev(mdev);
4575         }
4576         dec_rs_pending(mdev);
4577         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4578
4579         return 0;
4580 }
4581
4582 static int
4583 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4584                               struct rb_root *root, const char *func,
4585                               enum drbd_req_event what, bool missing_ok)
4586 {
4587         struct drbd_request *req;
4588         struct bio_and_error m;
4589
4590         spin_lock_irq(&mdev->tconn->req_lock);
4591         req = find_request(mdev, root, id, sector, missing_ok, func);
4592         if (unlikely(!req)) {
4593                 spin_unlock_irq(&mdev->tconn->req_lock);
4594                 return -EIO;
4595         }
4596         __req_mod(req, what, &m);
4597         spin_unlock_irq(&mdev->tconn->req_lock);
4598
4599         if (m.bio)
4600                 complete_master_bio(mdev, &m);
4601         return 0;
4602 }
4603
4604 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4605 {
4606         struct drbd_conf *mdev;
4607         struct p_block_ack *p = pi->data;
4608         sector_t sector = be64_to_cpu(p->sector);
4609         int blksize = be32_to_cpu(p->blksize);
4610         enum drbd_req_event what;
4611
4612         mdev = vnr_to_mdev(tconn, pi->vnr);
4613         if (!mdev)
4614                 return -EIO;
4615
4616         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4617
4618         if (p->block_id == ID_SYNCER) {
4619                 drbd_set_in_sync(mdev, sector, blksize);
4620                 dec_rs_pending(mdev);
4621                 return 0;
4622         }
4623         switch (pi->cmd) {
4624         case P_RS_WRITE_ACK:
4625                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4626                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4627                 break;
4628         case P_WRITE_ACK:
4629                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4630                 what = WRITE_ACKED_BY_PEER;
4631                 break;
4632         case P_RECV_ACK:
4633                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4634                 what = RECV_ACKED_BY_PEER;
4635                 break;
4636         case P_DISCARD_WRITE:
4637                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4638                 what = DISCARD_WRITE;
4639                 break;
4640         case P_RETRY_WRITE:
4641                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4642                 what = POSTPONE_WRITE;
4643                 break;
4644         default:
4645                 BUG();
4646         }
4647
4648         return validate_req_change_req_state(mdev, p->block_id, sector,
4649                                              &mdev->write_requests, __func__,
4650                                              what, false);
4651 }
4652
4653 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4654 {
4655         struct drbd_conf *mdev;
4656         struct p_block_ack *p = pi->data;
4657         sector_t sector = be64_to_cpu(p->sector);
4658         int size = be32_to_cpu(p->blksize);
4659         bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4660                           tconn->net_conf->wire_protocol == DRBD_PROT_B;
4661         int err;
4662
4663         mdev = vnr_to_mdev(tconn, pi->vnr);
4664         if (!mdev)
4665                 return -EIO;
4666
4667         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4668
4669         if (p->block_id == ID_SYNCER) {
4670                 dec_rs_pending(mdev);
4671                 drbd_rs_failed_io(mdev, sector, size);
4672                 return 0;
4673         }
4674
4675         err = validate_req_change_req_state(mdev, p->block_id, sector,
4676                                             &mdev->write_requests, __func__,
4677                                             NEG_ACKED, missing_ok);
4678         if (err) {
4679                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4680                    The master bio might already be completed, therefore the
4681                    request is no longer in the collision hash. */
4682                 /* In Protocol B we might already have got a P_RECV_ACK
4683                    but then get a P_NEG_ACK afterwards. */
4684                 if (!missing_ok)
4685                         return err;
4686                 drbd_set_out_of_sync(mdev, sector, size);
4687         }
4688         return 0;
4689 }
4690
4691 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4692 {
4693         struct drbd_conf *mdev;
4694         struct p_block_ack *p = pi->data;
4695         sector_t sector = be64_to_cpu(p->sector);
4696
4697         mdev = vnr_to_mdev(tconn, pi->vnr);
4698         if (!mdev)
4699                 return -EIO;
4700
4701         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4702
4703         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4704             (unsigned long long)sector, be32_to_cpu(p->blksize));
4705
4706         return validate_req_change_req_state(mdev, p->block_id, sector,
4707                                              &mdev->read_requests, __func__,
4708                                              NEG_ACKED, false);
4709 }
4710
4711 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4712 {
4713         struct drbd_conf *mdev;
4714         sector_t sector;
4715         int size;
4716         struct p_block_ack *p = pi->data;
4717
4718         mdev = vnr_to_mdev(tconn, pi->vnr);
4719         if (!mdev)
4720                 return -EIO;
4721
4722         sector = be64_to_cpu(p->sector);
4723         size = be32_to_cpu(p->blksize);
4724
4725         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4726
4727         dec_rs_pending(mdev);
4728
4729         if (get_ldev_if_state(mdev, D_FAILED)) {
4730                 drbd_rs_complete_io(mdev, sector);
4731                 switch (pi->cmd) {
4732                 case P_NEG_RS_DREPLY:
4733                         drbd_rs_failed_io(mdev, sector, size);
4734                 case P_RS_CANCEL:
4735                         break;
4736                 default:
4737                         BUG();
4738                 }
4739                 put_ldev(mdev);
4740         }
4741
4742         return 0;
4743 }
4744
4745 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4746 {
4747         struct drbd_conf *mdev;
4748         struct p_barrier_ack *p = pi->data;
4749
4750         mdev = vnr_to_mdev(tconn, pi->vnr);
4751         if (!mdev)
4752                 return -EIO;
4753
4754         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4755
4756         if (mdev->state.conn == C_AHEAD &&
4757             atomic_read(&mdev->ap_in_flight) == 0 &&
4758             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4759                 mdev->start_resync_timer.expires = jiffies + HZ;
4760                 add_timer(&mdev->start_resync_timer);
4761         }
4762
4763         return 0;
4764 }
4765
4766 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4767 {
4768         struct drbd_conf *mdev;
4769         struct p_block_ack *p = pi->data;
4770         struct drbd_work *w;
4771         sector_t sector;
4772         int size;
4773
4774         mdev = vnr_to_mdev(tconn, pi->vnr);
4775         if (!mdev)
4776                 return -EIO;
4777
4778         sector = be64_to_cpu(p->sector);
4779         size = be32_to_cpu(p->blksize);
4780
4781         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4782
4783         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4784                 drbd_ov_out_of_sync_found(mdev, sector, size);
4785         else
4786                 ov_out_of_sync_print(mdev);
4787
4788         if (!get_ldev(mdev))
4789                 return 0;
4790
4791         drbd_rs_complete_io(mdev, sector);
4792         dec_rs_pending(mdev);
4793
4794         --mdev->ov_left;
4795
4796         /* let's advance progress step marks only for every other megabyte */
4797         if ((mdev->ov_left & 0x200) == 0x200)
4798                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4799
4800         if (mdev->ov_left == 0) {
4801                 w = kmalloc(sizeof(*w), GFP_NOIO);
4802                 if (w) {
4803                         w->cb = w_ov_finished;
4804                         w->mdev = mdev;
4805                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4806                 } else {
4807                         dev_err(DEV, "kmalloc(w) failed.");
4808                         ov_out_of_sync_print(mdev);
4809                         drbd_resync_finished(mdev);
4810                 }
4811         }
4812         put_ldev(mdev);
4813         return 0;
4814 }
4815
4816 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4817 {
4818         return 0;
4819 }
4820
4821 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4822 {
4823         struct drbd_conf *mdev;
4824         int i, not_empty = 0;
4825
4826         do {
4827                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4828                 flush_signals(current);
4829                 down_read(&drbd_cfg_rwsem);
4830                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4831                         if (drbd_finish_peer_reqs(mdev)) {
4832                                 up_read(&drbd_cfg_rwsem);
4833                                 return 1; /* error */
4834                         }
4835                 }
4836                 up_read(&drbd_cfg_rwsem);
4837                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4838
4839                 spin_lock_irq(&tconn->req_lock);
4840                 rcu_read_lock();
4841                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4842                         not_empty = !list_empty(&mdev->done_ee);
4843                         if (not_empty)
4844                                 break;
4845                 }
4846                 rcu_read_unlock();
4847                 spin_unlock_irq(&tconn->req_lock);
4848         } while (not_empty);
4849
4850         return 0;
4851 }
4852
4853 struct asender_cmd {
4854         size_t pkt_size;
4855         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4856 };
4857
4858 static struct asender_cmd asender_tbl[] = {
4859         [P_PING]            = { 0, got_Ping },
4860         [P_PING_ACK]        = { 0, got_PingAck },
4861         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4862         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4863         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4864         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4865         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4866         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4867         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4868         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4869         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4870         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4871         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4872         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4873         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4874         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4875         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4876 };
4877
4878 int drbd_asender(struct drbd_thread *thi)
4879 {
4880         struct drbd_tconn *tconn = thi->tconn;
4881         struct asender_cmd *cmd = NULL;
4882         struct packet_info pi;
4883         int rv;
4884         void *buf    = tconn->meta.rbuf;
4885         int received = 0;
4886         unsigned int header_size = drbd_header_size(tconn);
4887         int expect   = header_size;
4888         int ping_timeout_active = 0;
4889
4890         current->policy = SCHED_RR;  /* Make this a realtime task! */
4891         current->rt_priority = 2;    /* more important than all other tasks */
4892
4893         while (get_t_state(thi) == RUNNING) {
4894                 drbd_thread_current_set_cpu(thi);
4895                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4896                         if (drbd_send_ping(tconn)) {
4897                                 conn_err(tconn, "drbd_send_ping has failed\n");
4898                                 goto reconnect;
4899                         }
4900                         tconn->meta.socket->sk->sk_rcvtimeo =
4901                                 tconn->net_conf->ping_timeo*HZ/10;
4902                         ping_timeout_active = 1;
4903                 }
4904
4905                 /* TODO: conditionally cork; it may hurt latency if we cork without
4906                    much to send */
4907                 if (!tconn->net_conf->no_cork)
4908                         drbd_tcp_cork(tconn->meta.socket);
4909                 if (tconn_finish_peer_reqs(tconn)) {
4910                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
4911                         goto reconnect;
4912                 }
4913                 /* but unconditionally uncork unless disabled */
4914                 if (!tconn->net_conf->no_cork)
4915                         drbd_tcp_uncork(tconn->meta.socket);
4916
4917                 /* short circuit, recv_msg would return EINTR anyways. */
4918                 if (signal_pending(current))
4919                         continue;
4920
4921                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4922                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4923
4924                 flush_signals(current);
4925
4926                 /* Note:
4927                  * -EINTR        (on meta) we got a signal
4928                  * -EAGAIN       (on meta) rcvtimeo expired
4929                  * -ECONNRESET   other side closed the connection
4930                  * -ERESTARTSYS  (on data) we got a signal
4931                  * rv <  0       other than above: unexpected error!
4932                  * rv == expected: full header or command
4933                  * rv <  expected: "woken" by signal during receive
4934                  * rv == 0       : "connection shut down by peer"
4935                  */
4936                 if (likely(rv > 0)) {
4937                         received += rv;
4938                         buf      += rv;
4939                 } else if (rv == 0) {
4940                         conn_err(tconn, "meta connection shut down by peer.\n");
4941                         goto reconnect;
4942                 } else if (rv == -EAGAIN) {
4943                         /* If the data socket received something meanwhile,
4944                          * that is good enough: peer is still alive. */
4945                         if (time_after(tconn->last_received,
4946                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4947                                 continue;
4948                         if (ping_timeout_active) {
4949                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4950                                 goto reconnect;
4951                         }
4952                         set_bit(SEND_PING, &tconn->flags);
4953                         continue;
4954                 } else if (rv == -EINTR) {
4955                         continue;
4956                 } else {
4957                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4958                         goto reconnect;
4959                 }
4960
4961                 if (received == expect && cmd == NULL) {
4962                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
4963                                 goto reconnect;
4964                         cmd = &asender_tbl[pi.cmd];
4965                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4966                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4967                                         pi.cmd, pi.size);
4968                                 goto disconnect;
4969                         }
4970                         expect = header_size + cmd->pkt_size;
4971                         if (pi.size != expect - header_size) {
4972                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4973                                         pi.cmd, pi.size);
4974                                 goto reconnect;
4975                         }
4976                 }
4977                 if (received == expect) {
4978                         bool err;
4979
4980                         err = cmd->fn(tconn, &pi);
4981                         if (err) {
4982                                 conn_err(tconn, "%pf failed\n", cmd->fn);
4983                                 goto reconnect;
4984                         }
4985
4986                         tconn->last_received = jiffies;
4987
4988                         /* the idle_timeout (ping-int)
4989                          * has been restored in got_PingAck() */
4990                         if (cmd == &asender_tbl[P_PING_ACK])
4991                                 ping_timeout_active = 0;
4992
4993                         buf      = tconn->meta.rbuf;
4994                         received = 0;
4995                         expect   = header_size;
4996                         cmd      = NULL;
4997                 }
4998         }
4999
5000         if (0) {
5001 reconnect:
5002                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5003         }
5004         if (0) {
5005 disconnect:
5006                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5007         }
5008         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5009
5010         conn_info(tconn, "asender terminated\n");
5011
5012         return 0;
5013 }