4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
79 /* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
83 static struct page *page_chain_del(struct page **head, int n)
97 tmp = page_chain_next(page);
99 break; /* found sufficient pages */
101 /* insufficient pages, don't use any of them. */
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
114 /* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
121 while ((tmp = page_chain_next(page)))
128 static int page_chain_free(struct page *page)
132 page_chain_for_each_safe(page, tmp) {
139 static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
156 struct page *page = NULL;
157 struct page *tmp = NULL;
160 /* Yes, testing drbd_pp_vacant outside the lock is racy.
161 * So what. It saves a spin_lock. */
162 if (drbd_pp_vacant >= number) {
163 spin_lock(&drbd_pp_lock);
164 page = page_chain_del(&drbd_pp_pool, number);
166 drbd_pp_vacant -= number;
167 spin_unlock(&drbd_pp_lock);
172 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173 * "criss-cross" setup, that might cause write-out on some other DRBD,
174 * which in turn might block on the other node at this very place. */
175 for (i = 0; i < number; i++) {
176 tmp = alloc_page(GFP_TRY);
179 set_page_private(tmp, (unsigned long)page);
186 /* Not enough pages immediately available this time.
187 * No need to jump around here, drbd_alloc_pages will retry this
188 * function "soon". */
190 tmp = page_chain_tail(page, NULL);
191 spin_lock(&drbd_pp_lock);
192 page_chain_add(&drbd_pp_pool, page, tmp);
194 spin_unlock(&drbd_pp_lock);
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200 struct list_head *to_be_freed)
202 struct drbd_peer_request *peer_req;
203 struct list_head *le, *tle;
205 /* The EEs are always appended to the end of the list. Since
206 they are sent in order over the wire, they have to finish
207 in order. As soon as we see the first not finished we can
208 stop to examine the list... */
210 list_for_each_safe(le, tle, &mdev->net_ee) {
211 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212 if (drbd_peer_req_has_active_page(peer_req))
214 list_move(le, to_be_freed);
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
223 spin_lock_irq(&mdev->tconn->req_lock);
224 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225 spin_unlock_irq(&mdev->tconn->req_lock);
227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 drbd_free_net_peer_req(mdev, peer_req);
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @mdev: DRBD device.
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel, unless this allocation would exceed the max_buffers setting.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
241 * Returns a page chain linked via page->private.
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
246 struct page *page = NULL;
249 /* Yes, we may run up to @number over max_buffers. If we
250 * follow it strictly, the admin will get it wrong anyways. */
251 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
252 page = __drbd_alloc_pages(mdev, number);
254 while (page == NULL) {
255 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257 drbd_kick_lo_and_reclaim_net(mdev);
259 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
260 page = __drbd_alloc_pages(mdev, number);
268 if (signal_pending(current)) {
269 dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
275 finish_wait(&drbd_pp_wait, &wait);
278 atomic_add(number, &mdev->pp_in_use);
282 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
283 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
284 * Either links the page chain back to the global pool,
285 * or returns all pages to the system. */
286 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
288 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
291 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
292 i = page_chain_free(page);
295 tmp = page_chain_tail(page, &i);
296 spin_lock(&drbd_pp_lock);
297 page_chain_add(&drbd_pp_pool, page, tmp);
299 spin_unlock(&drbd_pp_lock);
301 i = atomic_sub_return(i, a);
303 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
304 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
305 wake_up(&drbd_pp_wait);
309 You need to hold the req_lock:
310 _drbd_wait_ee_list_empty()
312 You must not have the req_lock:
314 drbd_alloc_peer_req()
315 drbd_free_peer_reqs()
317 drbd_finish_peer_reqs()
319 drbd_wait_ee_list_empty()
322 struct drbd_peer_request *
323 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
324 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
326 struct drbd_peer_request *peer_req;
328 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
330 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
333 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
335 if (!(gfp_mask & __GFP_NOWARN))
336 dev_err(DEV, "%s: allocation failed\n", __func__);
340 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
344 drbd_clear_interval(&peer_req->i);
345 peer_req->i.size = data_size;
346 peer_req->i.sector = sector;
347 peer_req->i.local = false;
348 peer_req->i.waiting = false;
350 peer_req->epoch = NULL;
351 peer_req->w.mdev = mdev;
352 peer_req->pages = page;
353 atomic_set(&peer_req->pending_bios, 0);
356 * The block_id is opaque to the receiver. It is not endianness
357 * converted, and sent back to the sender unchanged.
359 peer_req->block_id = id;
364 mempool_free(peer_req, drbd_ee_mempool);
368 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
371 if (peer_req->flags & EE_HAS_DIGEST)
372 kfree(peer_req->digest);
373 drbd_free_pages(mdev, peer_req->pages, is_net);
374 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
375 D_ASSERT(drbd_interval_empty(&peer_req->i));
376 mempool_free(peer_req, drbd_ee_mempool);
379 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
381 LIST_HEAD(work_list);
382 struct drbd_peer_request *peer_req, *t;
384 int is_net = list == &mdev->net_ee;
386 spin_lock_irq(&mdev->tconn->req_lock);
387 list_splice_init(list, &work_list);
388 spin_unlock_irq(&mdev->tconn->req_lock);
390 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
391 __drbd_free_peer_req(mdev, peer_req, is_net);
398 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
400 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
402 LIST_HEAD(work_list);
403 LIST_HEAD(reclaimed);
404 struct drbd_peer_request *peer_req, *t;
407 spin_lock_irq(&mdev->tconn->req_lock);
408 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
409 list_splice_init(&mdev->done_ee, &work_list);
410 spin_unlock_irq(&mdev->tconn->req_lock);
412 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
413 drbd_free_net_peer_req(mdev, peer_req);
415 /* possible callbacks here:
416 * e_end_block, and e_end_resync_block, e_send_discard_write.
417 * all ignore the last argument.
419 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
422 /* list_del not necessary, next/prev members not touched */
423 err2 = peer_req->w.cb(&peer_req->w, !!err);
426 drbd_free_peer_req(mdev, peer_req);
428 wake_up(&mdev->ee_wait);
433 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
434 struct list_head *head)
438 /* avoids spin_lock/unlock
439 * and calling prepare_to_wait in the fast path */
440 while (!list_empty(head)) {
441 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
442 spin_unlock_irq(&mdev->tconn->req_lock);
444 finish_wait(&mdev->ee_wait, &wait);
445 spin_lock_irq(&mdev->tconn->req_lock);
449 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
450 struct list_head *head)
452 spin_lock_irq(&mdev->tconn->req_lock);
453 _drbd_wait_ee_list_empty(mdev, head);
454 spin_unlock_irq(&mdev->tconn->req_lock);
457 /* see also kernel_accept; which is only present since 2.6.18.
458 * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
461 struct sock *sk = sock->sk;
465 err = sock->ops->listen(sock, 5);
469 *what = "sock_create_lite";
470 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 err = sock->ops->accept(sock, *newsock, 0);
478 sock_release(*newsock);
482 (*newsock)->ops = sock->ops;
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
495 struct msghdr msg = {
497 .msg_iov = (struct iovec *)&iov,
498 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
517 struct msghdr msg = {
519 .msg_iov = (struct iovec *)&iov,
520 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
533 * ECONNRESET other side closed the connection
534 * ERESTARTSYS (on sock) we got a signal
538 if (rv == -ECONNRESET)
539 conn_info(tconn, "sock was reset by peer\n");
540 else if (rv != -ERESTARTSYS)
541 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
543 } else if (rv == 0) {
544 conn_info(tconn, "sock was shut down by peer\n");
547 /* signal came in, or peer/link went down,
548 * after we read a partial message
550 /* D_ASSERT(signal_pending(current)); */
558 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
567 err = drbd_recv(tconn, buf, size);
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
580 err = drbd_recv_all(tconn, buf, size);
581 if (err && !signal_pending(current))
582 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
587 * On individual connections, the socket buffer size must be set prior to the
588 * listen(2) or connect(2) calls in order to have it take effect.
589 * This is our wrapper to do so.
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
594 /* open coded SO_SNDBUF, SO_RCVBUF */
596 sock->sk->sk_sndbuf = snd;
597 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
600 sock->sk->sk_rcvbuf = rcv;
601 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
609 struct sockaddr_in6 src_in6;
611 int disconnect_on_error = 1;
613 if (!get_net_conf(tconn))
616 what = "sock_create_kern";
617 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618 SOCK_STREAM, IPPROTO_TCP, &sock);
624 sock->sk->sk_rcvtimeo =
625 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
626 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627 tconn->net_conf->rcvbuf_size);
629 /* explicitly bind to the configured IP as source IP
630 * for the outgoing connections.
631 * This is needed for multihomed hosts and to be
632 * able to use lo: interfaces for drbd.
633 * Make sure to use 0 as port number, so linux selects
634 * a free one dynamically.
636 memcpy(&src_in6, tconn->net_conf->my_addr,
637 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639 src_in6.sin6_port = 0;
641 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
643 what = "bind before connect";
644 err = sock->ops->bind(sock,
645 (struct sockaddr *) &src_in6,
646 tconn->net_conf->my_addr_len);
650 /* connect may fail, peer not yet available.
651 * stay C_WF_CONNECTION, don't go Disconnecting! */
652 disconnect_on_error = 0;
654 err = sock->ops->connect(sock,
655 (struct sockaddr *)tconn->net_conf->peer_addr,
656 tconn->net_conf->peer_addr_len, 0);
665 /* timeout, busy, signal pending */
666 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667 case EINTR: case ERESTARTSYS:
668 /* peer not (yet) available, network problem */
669 case ECONNREFUSED: case ENETUNREACH:
670 case EHOSTDOWN: case EHOSTUNREACH:
671 disconnect_on_error = 0;
674 conn_err(tconn, "%s failed, err = %d\n", what, err);
676 if (disconnect_on_error)
677 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
686 struct socket *s_estab = NULL, *s_listen;
689 if (!get_net_conf(tconn))
692 what = "sock_create_kern";
693 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694 SOCK_STREAM, IPPROTO_TCP, &s_listen);
700 timeo = tconn->net_conf->try_connect_int * HZ;
701 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
703 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
704 s_listen->sk->sk_rcvtimeo = timeo;
705 s_listen->sk->sk_sndtimeo = timeo;
706 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707 tconn->net_conf->rcvbuf_size);
709 what = "bind before listen";
710 err = s_listen->ops->bind(s_listen,
711 (struct sockaddr *) tconn->net_conf->my_addr,
712 tconn->net_conf->my_addr_len);
716 err = drbd_accept(&what, s_listen, &s_estab);
720 sock_release(s_listen);
722 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723 conn_err(tconn, "%s failed, err = %d\n", what, err);
724 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
732 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
734 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
735 enum drbd_packet cmd)
737 if (!conn_prepare_command(tconn, sock))
739 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
742 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
744 unsigned int header_size = drbd_header_size(tconn);
745 struct packet_info pi;
748 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
749 if (err != header_size) {
754 err = decode_header(tconn, tconn->data.rbuf, &pi);
761 * drbd_socket_okay() - Free the socket if its connection is not okay
762 * @sock: pointer to the pointer to the socket.
764 static int drbd_socket_okay(struct socket **sock)
772 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
774 if (rr > 0 || rr == -EAGAIN) {
782 /* Gets called if a connection is established, or if a new minor gets created
784 int drbd_connected(int vnr, void *p, void *data)
786 struct drbd_conf *mdev = (struct drbd_conf *)p;
789 atomic_set(&mdev->packet_seq, 0);
792 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
793 &mdev->tconn->cstate_mutex :
794 &mdev->own_state_mutex;
796 err = drbd_send_sync_param(mdev);
798 err = drbd_send_sizes(mdev, 0, 0);
800 err = drbd_send_uuids(mdev);
802 err = drbd_send_state(mdev);
803 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
804 clear_bit(RESIZE_PENDING, &mdev->flags);
805 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
811 * 1 yes, we have a valid connection
812 * 0 oops, did not work out, please try again
813 * -1 peer talks different language,
814 * no point in trying again, please go standalone.
815 * -2 We do not have a network config...
817 static int drbd_connect(struct drbd_tconn *tconn)
819 struct socket *sock, *msock;
822 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
825 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
827 /* Assume that the peer only understands protocol 80 until we know better. */
828 tconn->agreed_pro_version = 80;
834 /* 3 tries, this should take less than a second! */
835 s = drbd_try_connect(tconn);
838 /* give the other side time to call bind() & listen() */
839 schedule_timeout_interruptible(HZ / 10);
843 if (!tconn->data.socket) {
844 tconn->data.socket = s;
845 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
846 } else if (!tconn->meta.socket) {
847 tconn->meta.socket = s;
848 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
850 conn_err(tconn, "Logic error in drbd_connect()\n");
851 goto out_release_sockets;
855 if (tconn->data.socket && tconn->meta.socket) {
856 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
857 ok = drbd_socket_okay(&tconn->data.socket);
858 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
864 s = drbd_wait_for_connect(tconn);
866 try = receive_first_packet(tconn, s);
867 drbd_socket_okay(&tconn->data.socket);
868 drbd_socket_okay(&tconn->meta.socket);
871 if (tconn->data.socket) {
872 conn_warn(tconn, "initial packet S crossed\n");
873 sock_release(tconn->data.socket);
875 tconn->data.socket = s;
878 if (tconn->meta.socket) {
879 conn_warn(tconn, "initial packet M crossed\n");
880 sock_release(tconn->meta.socket);
882 tconn->meta.socket = s;
883 set_bit(DISCARD_CONCURRENT, &tconn->flags);
886 conn_warn(tconn, "Error receiving initial packet\n");
893 if (tconn->cstate <= C_DISCONNECTING)
894 goto out_release_sockets;
895 if (signal_pending(current)) {
896 flush_signals(current);
898 if (get_t_state(&tconn->receiver) == EXITING)
899 goto out_release_sockets;
902 if (tconn->data.socket && &tconn->meta.socket) {
903 ok = drbd_socket_okay(&tconn->data.socket);
904 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
910 sock = tconn->data.socket;
911 msock = tconn->meta.socket;
913 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
914 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
916 sock->sk->sk_allocation = GFP_NOIO;
917 msock->sk->sk_allocation = GFP_NOIO;
919 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
920 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
923 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
924 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925 * first set it to the P_CONNECTION_FEATURES timeout,
926 * which we set to 4x the configured ping_timeout. */
927 sock->sk->sk_sndtimeo =
928 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
930 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
931 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
933 /* we don't want delays.
934 * we use TCP_CORK where appropriate, though */
935 drbd_tcp_nodelay(sock);
936 drbd_tcp_nodelay(msock);
938 tconn->last_received = jiffies;
940 h = drbd_do_features(tconn);
944 if (tconn->cram_hmac_tfm) {
945 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
946 switch (drbd_do_auth(tconn)) {
948 conn_err(tconn, "Authentication of peer failed\n");
951 conn_err(tconn, "Authentication of peer failed, trying again.\n");
956 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
959 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
960 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
962 drbd_thread_start(&tconn->asender);
964 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
967 down_read(&drbd_cfg_rwsem);
968 h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
969 up_read(&drbd_cfg_rwsem);
973 if (tconn->data.socket) {
974 sock_release(tconn->data.socket);
975 tconn->data.socket = NULL;
977 if (tconn->meta.socket) {
978 sock_release(tconn->meta.socket);
979 tconn->meta.socket = NULL;
984 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
986 unsigned int header_size = drbd_header_size(tconn);
988 if (header_size == sizeof(struct p_header100) &&
989 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
990 struct p_header100 *h = header;
992 conn_err(tconn, "Header padding is not zero\n");
995 pi->vnr = be16_to_cpu(h->volume);
996 pi->cmd = be16_to_cpu(h->command);
997 pi->size = be32_to_cpu(h->length);
998 } else if (header_size == sizeof(struct p_header95) &&
999 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1000 struct p_header95 *h = header;
1001 pi->cmd = be16_to_cpu(h->command);
1002 pi->size = be32_to_cpu(h->length);
1004 } else if (header_size == sizeof(struct p_header80) &&
1005 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1006 struct p_header80 *h = header;
1007 pi->cmd = be16_to_cpu(h->command);
1008 pi->size = be16_to_cpu(h->length);
1011 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1012 be32_to_cpu(*(__be32 *)header),
1013 tconn->agreed_pro_version);
1016 pi->data = header + header_size;
1020 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1022 void *buffer = tconn->data.rbuf;
1025 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1029 err = decode_header(tconn, buffer, pi);
1030 tconn->last_received = jiffies;
1035 static void drbd_flush(struct drbd_conf *mdev)
1039 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1040 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1043 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1044 /* would rather check on EOPNOTSUPP, but that is not reliable.
1045 * don't try again for ANY return value != 0
1046 * if (rv == -EOPNOTSUPP) */
1047 drbd_bump_write_ordering(mdev, WO_drain_io);
1054 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1055 * @mdev: DRBD device.
1056 * @epoch: Epoch object.
1059 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1060 struct drbd_epoch *epoch,
1061 enum epoch_event ev)
1064 struct drbd_epoch *next_epoch;
1065 enum finish_epoch rv = FE_STILL_LIVE;
1067 spin_lock(&mdev->epoch_lock);
1071 epoch_size = atomic_read(&epoch->epoch_size);
1073 switch (ev & ~EV_CLEANUP) {
1075 atomic_dec(&epoch->active);
1077 case EV_GOT_BARRIER_NR:
1078 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1080 case EV_BECAME_LAST:
1085 if (epoch_size != 0 &&
1086 atomic_read(&epoch->active) == 0 &&
1087 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1088 if (!(ev & EV_CLEANUP)) {
1089 spin_unlock(&mdev->epoch_lock);
1090 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1091 spin_lock(&mdev->epoch_lock);
1095 if (mdev->current_epoch != epoch) {
1096 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1097 list_del(&epoch->list);
1098 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1102 if (rv == FE_STILL_LIVE)
1106 atomic_set(&epoch->epoch_size, 0);
1107 /* atomic_set(&epoch->active, 0); is already zero */
1108 if (rv == FE_STILL_LIVE)
1110 wake_up(&mdev->ee_wait);
1120 spin_unlock(&mdev->epoch_lock);
1126 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1127 * @mdev: DRBD device.
1128 * @wo: Write ordering method to try.
1130 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1132 enum write_ordering_e pwo;
1133 static char *write_ordering_str[] = {
1135 [WO_drain_io] = "drain",
1136 [WO_bdev_flush] = "flush",
1139 pwo = mdev->write_ordering;
1141 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1143 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1145 mdev->write_ordering = wo;
1146 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1147 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1151 * drbd_submit_peer_request()
1152 * @mdev: DRBD device.
1153 * @peer_req: peer request
1154 * @rw: flag field, see bio->bi_rw
1156 * May spread the pages to multiple bios,
1157 * depending on bio_add_page restrictions.
1159 * Returns 0 if all bios have been submitted,
1160 * -ENOMEM if we could not allocate enough bios,
1161 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1162 * single page to an empty bio (which should never happen and likely indicates
1163 * that the lower level IO stack is in some way broken). This has been observed
1164 * on certain Xen deployments.
1166 /* TODO allocate from our own bio_set. */
1167 int drbd_submit_peer_request(struct drbd_conf *mdev,
1168 struct drbd_peer_request *peer_req,
1169 const unsigned rw, const int fault_type)
1171 struct bio *bios = NULL;
1173 struct page *page = peer_req->pages;
1174 sector_t sector = peer_req->i.sector;
1175 unsigned ds = peer_req->i.size;
1176 unsigned n_bios = 0;
1177 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1180 /* In most cases, we will only need one bio. But in case the lower
1181 * level restrictions happen to be different at this offset on this
1182 * side than those of the sending peer, we may need to submit the
1183 * request in more than one bio.
1185 * Plain bio_alloc is good enough here, this is no DRBD internally
1186 * generated bio, but a bio allocated on behalf of the peer.
1189 bio = bio_alloc(GFP_NOIO, nr_pages);
1191 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1194 /* > peer_req->i.sector, unless this is the first bio */
1195 bio->bi_sector = sector;
1196 bio->bi_bdev = mdev->ldev->backing_bdev;
1198 bio->bi_private = peer_req;
1199 bio->bi_end_io = drbd_peer_request_endio;
1201 bio->bi_next = bios;
1205 page_chain_for_each(page) {
1206 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1207 if (!bio_add_page(bio, page, len, 0)) {
1208 /* A single page must always be possible!
1209 * But in case it fails anyways,
1210 * we deal with it, and complain (below). */
1211 if (bio->bi_vcnt == 0) {
1213 "bio_add_page failed for len=%u, "
1214 "bi_vcnt=0 (bi_sector=%llu)\n",
1215 len, (unsigned long long)bio->bi_sector);
1225 D_ASSERT(page == NULL);
1228 atomic_set(&peer_req->pending_bios, n_bios);
1231 bios = bios->bi_next;
1232 bio->bi_next = NULL;
1234 drbd_generic_make_request(mdev, fault_type, bio);
1241 bios = bios->bi_next;
1247 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1248 struct drbd_peer_request *peer_req)
1250 struct drbd_interval *i = &peer_req->i;
1252 drbd_remove_interval(&mdev->write_requests, i);
1253 drbd_clear_interval(i);
1255 /* Wake up any processes waiting for this peer request to complete. */
1257 wake_up(&mdev->misc_wait);
1260 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1262 struct drbd_conf *mdev;
1264 struct p_barrier *p = pi->data;
1265 struct drbd_epoch *epoch;
1267 mdev = vnr_to_mdev(tconn, pi->vnr);
1273 mdev->current_epoch->barrier_nr = p->barrier;
1274 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1276 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1277 * the activity log, which means it would not be resynced in case the
1278 * R_PRIMARY crashes now.
1279 * Therefore we must send the barrier_ack after the barrier request was
1281 switch (mdev->write_ordering) {
1283 if (rv == FE_RECYCLED)
1286 /* receiver context, in the writeout path of the other node.
1287 * avoid potential distributed deadlock */
1288 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1292 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1297 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1300 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1301 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1306 epoch = mdev->current_epoch;
1307 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1309 D_ASSERT(atomic_read(&epoch->active) == 0);
1310 D_ASSERT(epoch->flags == 0);
1314 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1319 atomic_set(&epoch->epoch_size, 0);
1320 atomic_set(&epoch->active, 0);
1322 spin_lock(&mdev->epoch_lock);
1323 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1324 list_add(&epoch->list, &mdev->current_epoch->list);
1325 mdev->current_epoch = epoch;
1328 /* The current_epoch got recycled while we allocated this one... */
1331 spin_unlock(&mdev->epoch_lock);
1336 /* used from receive_RSDataReply (recv_resync_read)
1337 * and from receive_Data */
1338 static struct drbd_peer_request *
1339 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1340 int data_size) __must_hold(local)
1342 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1343 struct drbd_peer_request *peer_req;
1346 void *dig_in = mdev->tconn->int_dig_in;
1347 void *dig_vv = mdev->tconn->int_dig_vv;
1348 unsigned long *data;
1350 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1351 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1355 * FIXME: Receive the incoming digest into the receive buffer
1356 * here, together with its struct p_data?
1358 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1365 if (!expect(data_size != 0))
1367 if (!expect(IS_ALIGNED(data_size, 512)))
1369 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1372 /* even though we trust out peer,
1373 * we sometimes have to double check. */
1374 if (sector + (data_size>>9) > capacity) {
1375 dev_err(DEV, "request from peer beyond end of local disk: "
1376 "capacity: %llus < sector: %llus + size: %u\n",
1377 (unsigned long long)capacity,
1378 (unsigned long long)sector, data_size);
1382 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1383 * "criss-cross" setup, that might cause write-out on some other DRBD,
1384 * which in turn might block on the other node at this very place. */
1385 peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1390 page = peer_req->pages;
1391 page_chain_for_each(page) {
1392 unsigned len = min_t(int, ds, PAGE_SIZE);
1394 err = drbd_recv_all_warn(mdev->tconn, data, len);
1395 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1396 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1397 data[0] = data[0] ^ (unsigned long)-1;
1401 drbd_free_peer_req(mdev, peer_req);
1408 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1409 if (memcmp(dig_in, dig_vv, dgs)) {
1410 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1411 (unsigned long long)sector, data_size);
1412 drbd_free_peer_req(mdev, peer_req);
1416 mdev->recv_cnt += data_size>>9;
1420 /* drbd_drain_block() just takes a data block
1421 * out of the socket input buffer, and discards it.
1423 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1432 page = drbd_alloc_pages(mdev, 1, 1);
1436 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1438 err = drbd_recv_all_warn(mdev->tconn, data, len);
1444 drbd_free_pages(mdev, page, 0);
1448 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1449 sector_t sector, int data_size)
1451 struct bio_vec *bvec;
1453 int dgs, err, i, expect;
1454 void *dig_in = mdev->tconn->int_dig_in;
1455 void *dig_vv = mdev->tconn->int_dig_vv;
1457 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1458 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1461 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1468 /* optimistically update recv_cnt. if receiving fails below,
1469 * we disconnect anyways, and counters will be reset. */
1470 mdev->recv_cnt += data_size>>9;
1472 bio = req->master_bio;
1473 D_ASSERT(sector == bio->bi_sector);
1475 bio_for_each_segment(bvec, bio, i) {
1476 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1477 expect = min_t(int, data_size, bvec->bv_len);
1478 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1479 kunmap(bvec->bv_page);
1482 data_size -= expect;
1486 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1487 if (memcmp(dig_in, dig_vv, dgs)) {
1488 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1493 D_ASSERT(data_size == 0);
1498 * e_end_resync_block() is called in asender context via
1499 * drbd_finish_peer_reqs().
1501 static int e_end_resync_block(struct drbd_work *w, int unused)
1503 struct drbd_peer_request *peer_req =
1504 container_of(w, struct drbd_peer_request, w);
1505 struct drbd_conf *mdev = w->mdev;
1506 sector_t sector = peer_req->i.sector;
1509 D_ASSERT(drbd_interval_empty(&peer_req->i));
1511 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1512 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1513 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1515 /* Record failure to sync */
1516 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1518 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1525 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1527 struct drbd_peer_request *peer_req;
1529 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1533 dec_rs_pending(mdev);
1536 /* corresponding dec_unacked() in e_end_resync_block()
1537 * respective _drbd_clear_done_ee */
1539 peer_req->w.cb = e_end_resync_block;
1541 spin_lock_irq(&mdev->tconn->req_lock);
1542 list_add(&peer_req->w.list, &mdev->sync_ee);
1543 spin_unlock_irq(&mdev->tconn->req_lock);
1545 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1546 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1549 /* don't care for the reason here */
1550 dev_err(DEV, "submit failed, triggering re-connect\n");
1551 spin_lock_irq(&mdev->tconn->req_lock);
1552 list_del(&peer_req->w.list);
1553 spin_unlock_irq(&mdev->tconn->req_lock);
1555 drbd_free_peer_req(mdev, peer_req);
1561 static struct drbd_request *
1562 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1563 sector_t sector, bool missing_ok, const char *func)
1565 struct drbd_request *req;
1567 /* Request object according to our peer */
1568 req = (struct drbd_request *)(unsigned long)id;
1569 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1572 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1573 (unsigned long)id, (unsigned long long)sector);
1578 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1580 struct drbd_conf *mdev;
1581 struct drbd_request *req;
1584 struct p_data *p = pi->data;
1586 mdev = vnr_to_mdev(tconn, pi->vnr);
1590 sector = be64_to_cpu(p->sector);
1592 spin_lock_irq(&mdev->tconn->req_lock);
1593 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1594 spin_unlock_irq(&mdev->tconn->req_lock);
1598 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1599 * special casing it there for the various failure cases.
1600 * still no race with drbd_fail_pending_reads */
1601 err = recv_dless_read(mdev, req, sector, pi->size);
1603 req_mod(req, DATA_RECEIVED);
1604 /* else: nothing. handled from drbd_disconnect...
1605 * I don't think we may complete this just yet
1606 * in case we are "on-disconnect: freeze" */
1611 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1613 struct drbd_conf *mdev;
1616 struct p_data *p = pi->data;
1618 mdev = vnr_to_mdev(tconn, pi->vnr);
1622 sector = be64_to_cpu(p->sector);
1623 D_ASSERT(p->block_id == ID_SYNCER);
1625 if (get_ldev(mdev)) {
1626 /* data is submitted to disk within recv_resync_read.
1627 * corresponding put_ldev done below on error,
1628 * or in drbd_peer_request_endio. */
1629 err = recv_resync_read(mdev, sector, pi->size);
1631 if (__ratelimit(&drbd_ratelimit_state))
1632 dev_err(DEV, "Can not write resync data to local disk.\n");
1634 err = drbd_drain_block(mdev, pi->size);
1636 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1639 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1644 static int w_restart_write(struct drbd_work *w, int cancel)
1646 struct drbd_request *req = container_of(w, struct drbd_request, w);
1647 struct drbd_conf *mdev = w->mdev;
1649 unsigned long start_time;
1650 unsigned long flags;
1652 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1653 if (!expect(req->rq_state & RQ_POSTPONED)) {
1654 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1657 bio = req->master_bio;
1658 start_time = req->start_time;
1659 /* Postponed requests will not have their master_bio completed! */
1660 __req_mod(req, DISCARD_WRITE, NULL);
1661 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1663 while (__drbd_make_request(mdev, bio, start_time))
1668 static void restart_conflicting_writes(struct drbd_conf *mdev,
1669 sector_t sector, int size)
1671 struct drbd_interval *i;
1672 struct drbd_request *req;
1674 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1677 req = container_of(i, struct drbd_request, i);
1678 if (req->rq_state & RQ_LOCAL_PENDING ||
1679 !(req->rq_state & RQ_POSTPONED))
1681 if (expect(list_empty(&req->w.list))) {
1683 req->w.cb = w_restart_write;
1684 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1690 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1692 static int e_end_block(struct drbd_work *w, int cancel)
1694 struct drbd_peer_request *peer_req =
1695 container_of(w, struct drbd_peer_request, w);
1696 struct drbd_conf *mdev = w->mdev;
1697 sector_t sector = peer_req->i.sector;
1700 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1701 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1702 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1703 mdev->state.conn <= C_PAUSED_SYNC_T &&
1704 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1705 P_RS_WRITE_ACK : P_WRITE_ACK;
1706 err = drbd_send_ack(mdev, pcmd, peer_req);
1707 if (pcmd == P_RS_WRITE_ACK)
1708 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1710 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1711 /* we expect it to be marked out of sync anyways...
1712 * maybe assert this? */
1716 /* we delete from the conflict detection hash _after_ we sent out the
1717 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1718 if (mdev->tconn->net_conf->two_primaries) {
1719 spin_lock_irq(&mdev->tconn->req_lock);
1720 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1721 drbd_remove_epoch_entry_interval(mdev, peer_req);
1722 if (peer_req->flags & EE_RESTART_REQUESTS)
1723 restart_conflicting_writes(mdev, sector, peer_req->i.size);
1724 spin_unlock_irq(&mdev->tconn->req_lock);
1726 D_ASSERT(drbd_interval_empty(&peer_req->i));
1728 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1733 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1735 struct drbd_conf *mdev = w->mdev;
1736 struct drbd_peer_request *peer_req =
1737 container_of(w, struct drbd_peer_request, w);
1740 err = drbd_send_ack(mdev, ack, peer_req);
1746 static int e_send_discard_write(struct drbd_work *w, int unused)
1748 return e_send_ack(w, P_DISCARD_WRITE);
1751 static int e_send_retry_write(struct drbd_work *w, int unused)
1753 struct drbd_tconn *tconn = w->mdev->tconn;
1755 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1756 P_RETRY_WRITE : P_DISCARD_WRITE);
1759 static bool seq_greater(u32 a, u32 b)
1762 * We assume 32-bit wrap-around here.
1763 * For 24-bit wrap-around, we would have to shift:
1766 return (s32)a - (s32)b > 0;
1769 static u32 seq_max(u32 a, u32 b)
1771 return seq_greater(a, b) ? a : b;
1774 static bool need_peer_seq(struct drbd_conf *mdev)
1776 struct drbd_tconn *tconn = mdev->tconn;
1779 * We only need to keep track of the last packet_seq number of our peer
1780 * if we are in dual-primary mode and we have the discard flag set; see
1781 * handle_write_conflicts().
1783 return tconn->net_conf->two_primaries &&
1784 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1787 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1789 unsigned int newest_peer_seq;
1791 if (need_peer_seq(mdev)) {
1792 spin_lock(&mdev->peer_seq_lock);
1793 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1794 mdev->peer_seq = newest_peer_seq;
1795 spin_unlock(&mdev->peer_seq_lock);
1796 /* wake up only if we actually changed mdev->peer_seq */
1797 if (peer_seq == newest_peer_seq)
1798 wake_up(&mdev->seq_wait);
1802 /* Called from receive_Data.
1803 * Synchronize packets on sock with packets on msock.
1805 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1806 * packet traveling on msock, they are still processed in the order they have
1809 * Note: we don't care for Ack packets overtaking P_DATA packets.
1811 * In case packet_seq is larger than mdev->peer_seq number, there are
1812 * outstanding packets on the msock. We wait for them to arrive.
1813 * In case we are the logically next packet, we update mdev->peer_seq
1814 * ourselves. Correctly handles 32bit wrap around.
1816 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1817 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1818 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1819 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1821 * returns 0 if we may process the packet,
1822 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1823 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1829 if (!need_peer_seq(mdev))
1832 spin_lock(&mdev->peer_seq_lock);
1834 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1835 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1839 if (signal_pending(current)) {
1843 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1844 spin_unlock(&mdev->peer_seq_lock);
1845 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1846 timeout = schedule_timeout(timeout);
1847 spin_lock(&mdev->peer_seq_lock);
1850 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1854 spin_unlock(&mdev->peer_seq_lock);
1855 finish_wait(&mdev->seq_wait, &wait);
1859 /* see also bio_flags_to_wire()
1860 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1861 * flags and back. We may replicate to other kernel versions. */
1862 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1864 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1865 (dpf & DP_FUA ? REQ_FUA : 0) |
1866 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1867 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1870 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1873 struct drbd_interval *i;
1876 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1877 struct drbd_request *req;
1878 struct bio_and_error m;
1882 req = container_of(i, struct drbd_request, i);
1883 if (!(req->rq_state & RQ_POSTPONED))
1885 req->rq_state &= ~RQ_POSTPONED;
1886 __req_mod(req, NEG_ACKED, &m);
1887 spin_unlock_irq(&mdev->tconn->req_lock);
1889 complete_master_bio(mdev, &m);
1890 spin_lock_irq(&mdev->tconn->req_lock);
1895 static int handle_write_conflicts(struct drbd_conf *mdev,
1896 struct drbd_peer_request *peer_req)
1898 struct drbd_tconn *tconn = mdev->tconn;
1899 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1900 sector_t sector = peer_req->i.sector;
1901 const unsigned int size = peer_req->i.size;
1902 struct drbd_interval *i;
1907 * Inserting the peer request into the write_requests tree will prevent
1908 * new conflicting local requests from being added.
1910 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1913 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1914 if (i == &peer_req->i)
1919 * Our peer has sent a conflicting remote request; this
1920 * should not happen in a two-node setup. Wait for the
1921 * earlier peer request to complete.
1923 err = drbd_wait_misc(mdev, i);
1929 equal = i->sector == sector && i->size == size;
1930 if (resolve_conflicts) {
1932 * If the peer request is fully contained within the
1933 * overlapping request, it can be discarded; otherwise,
1934 * it will be retried once all overlapping requests
1937 bool discard = i->sector <= sector && i->sector +
1938 (i->size >> 9) >= sector + (size >> 9);
1941 dev_alert(DEV, "Concurrent writes detected: "
1942 "local=%llus +%u, remote=%llus +%u, "
1943 "assuming %s came first\n",
1944 (unsigned long long)i->sector, i->size,
1945 (unsigned long long)sector, size,
1946 discard ? "local" : "remote");
1949 peer_req->w.cb = discard ? e_send_discard_write :
1951 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1952 wake_asender(mdev->tconn);
1957 struct drbd_request *req =
1958 container_of(i, struct drbd_request, i);
1961 dev_alert(DEV, "Concurrent writes detected: "
1962 "local=%llus +%u, remote=%llus +%u\n",
1963 (unsigned long long)i->sector, i->size,
1964 (unsigned long long)sector, size);
1966 if (req->rq_state & RQ_LOCAL_PENDING ||
1967 !(req->rq_state & RQ_POSTPONED)) {
1969 * Wait for the node with the discard flag to
1970 * decide if this request will be discarded or
1971 * retried. Requests that are discarded will
1972 * disappear from the write_requests tree.
1974 * In addition, wait for the conflicting
1975 * request to finish locally before submitting
1976 * the conflicting peer request.
1978 err = drbd_wait_misc(mdev, &req->i);
1980 _conn_request_state(mdev->tconn,
1981 NS(conn, C_TIMEOUT),
1983 fail_postponed_requests(mdev, sector, size);
1989 * Remember to restart the conflicting requests after
1990 * the new peer request has completed.
1992 peer_req->flags |= EE_RESTART_REQUESTS;
1999 drbd_remove_epoch_entry_interval(mdev, peer_req);
2003 /* mirrored write */
2004 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2006 struct drbd_conf *mdev;
2008 struct drbd_peer_request *peer_req;
2009 struct p_data *p = pi->data;
2010 u32 peer_seq = be32_to_cpu(p->seq_num);
2015 mdev = vnr_to_mdev(tconn, pi->vnr);
2019 if (!get_ldev(mdev)) {
2022 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2023 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2024 atomic_inc(&mdev->current_epoch->epoch_size);
2025 err2 = drbd_drain_block(mdev, pi->size);
2032 * Corresponding put_ldev done either below (on various errors), or in
2033 * drbd_peer_request_endio, if we successfully submit the data at the
2034 * end of this function.
2037 sector = be64_to_cpu(p->sector);
2038 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2044 peer_req->w.cb = e_end_block;
2046 dp_flags = be32_to_cpu(p->dp_flags);
2047 rw |= wire_flags_to_bio(mdev, dp_flags);
2049 if (dp_flags & DP_MAY_SET_IN_SYNC)
2050 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2052 spin_lock(&mdev->epoch_lock);
2053 peer_req->epoch = mdev->current_epoch;
2054 atomic_inc(&peer_req->epoch->epoch_size);
2055 atomic_inc(&peer_req->epoch->active);
2056 spin_unlock(&mdev->epoch_lock);
2058 if (mdev->tconn->net_conf->two_primaries) {
2059 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2061 goto out_interrupted;
2062 spin_lock_irq(&mdev->tconn->req_lock);
2063 err = handle_write_conflicts(mdev, peer_req);
2065 spin_unlock_irq(&mdev->tconn->req_lock);
2066 if (err == -ENOENT) {
2070 goto out_interrupted;
2073 spin_lock_irq(&mdev->tconn->req_lock);
2074 list_add(&peer_req->w.list, &mdev->active_ee);
2075 spin_unlock_irq(&mdev->tconn->req_lock);
2077 if (mdev->tconn->agreed_pro_version < 100) {
2078 switch (mdev->tconn->net_conf->wire_protocol) {
2080 dp_flags |= DP_SEND_WRITE_ACK;
2083 dp_flags |= DP_SEND_RECEIVE_ACK;
2088 if (dp_flags & DP_SEND_WRITE_ACK) {
2089 peer_req->flags |= EE_SEND_WRITE_ACK;
2091 /* corresponding dec_unacked() in e_end_block()
2092 * respective _drbd_clear_done_ee */
2095 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2096 /* I really don't like it that the receiver thread
2097 * sends on the msock, but anyways */
2098 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2101 if (mdev->state.pdsk < D_INCONSISTENT) {
2102 /* In case we have the only disk of the cluster, */
2103 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2104 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2105 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2106 drbd_al_begin_io(mdev, &peer_req->i);
2109 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2113 /* don't care for the reason here */
2114 dev_err(DEV, "submit failed, triggering re-connect\n");
2115 spin_lock_irq(&mdev->tconn->req_lock);
2116 list_del(&peer_req->w.list);
2117 drbd_remove_epoch_entry_interval(mdev, peer_req);
2118 spin_unlock_irq(&mdev->tconn->req_lock);
2119 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2120 drbd_al_complete_io(mdev, &peer_req->i);
2123 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2125 drbd_free_peer_req(mdev, peer_req);
2129 /* We may throttle resync, if the lower device seems to be busy,
2130 * and current sync rate is above c_min_rate.
2132 * To decide whether or not the lower device is busy, we use a scheme similar
2133 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2134 * (more than 64 sectors) of activity we cannot account for with our own resync
2135 * activity, it obviously is "busy".
2137 * The current sync rate used here uses only the most recent two step marks,
2138 * to have a short time average so we can react faster.
2140 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2142 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2143 unsigned long db, dt, dbdt;
2144 struct lc_element *tmp;
2148 /* feature disabled? */
2149 if (mdev->ldev->dc.c_min_rate == 0)
2152 spin_lock_irq(&mdev->al_lock);
2153 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2155 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2156 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2157 spin_unlock_irq(&mdev->al_lock);
2160 /* Do not slow down if app IO is already waiting for this extent */
2162 spin_unlock_irq(&mdev->al_lock);
2164 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2165 (int)part_stat_read(&disk->part0, sectors[1]) -
2166 atomic_read(&mdev->rs_sect_ev);
2168 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2169 unsigned long rs_left;
2172 mdev->rs_last_events = curr_events;
2174 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2176 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2178 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2179 rs_left = mdev->ov_left;
2181 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2183 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2186 db = mdev->rs_mark_left[i] - rs_left;
2187 dbdt = Bit2KB(db/dt);
2189 if (dbdt > mdev->ldev->dc.c_min_rate)
2196 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2198 struct drbd_conf *mdev;
2201 struct drbd_peer_request *peer_req;
2202 struct digest_info *di = NULL;
2204 unsigned int fault_type;
2205 struct p_block_req *p = pi->data;
2207 mdev = vnr_to_mdev(tconn, pi->vnr);
2210 capacity = drbd_get_capacity(mdev->this_bdev);
2212 sector = be64_to_cpu(p->sector);
2213 size = be32_to_cpu(p->blksize);
2215 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2216 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2217 (unsigned long long)sector, size);
2220 if (sector + (size>>9) > capacity) {
2221 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2222 (unsigned long long)sector, size);
2226 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2229 case P_DATA_REQUEST:
2230 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2232 case P_RS_DATA_REQUEST:
2233 case P_CSUM_RS_REQUEST:
2235 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2239 dec_rs_pending(mdev);
2240 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2245 if (verb && __ratelimit(&drbd_ratelimit_state))
2246 dev_err(DEV, "Can not satisfy peer's read request, "
2247 "no local data.\n");
2249 /* drain possibly payload */
2250 return drbd_drain_block(mdev, pi->size);
2253 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2254 * "criss-cross" setup, that might cause write-out on some other DRBD,
2255 * which in turn might block on the other node at this very place. */
2256 peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2263 case P_DATA_REQUEST:
2264 peer_req->w.cb = w_e_end_data_req;
2265 fault_type = DRBD_FAULT_DT_RD;
2266 /* application IO, don't drbd_rs_begin_io */
2269 case P_RS_DATA_REQUEST:
2270 peer_req->w.cb = w_e_end_rsdata_req;
2271 fault_type = DRBD_FAULT_RS_RD;
2272 /* used in the sector offset progress display */
2273 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2277 case P_CSUM_RS_REQUEST:
2278 fault_type = DRBD_FAULT_RS_RD;
2279 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2283 di->digest_size = pi->size;
2284 di->digest = (((char *)di)+sizeof(struct digest_info));
2286 peer_req->digest = di;
2287 peer_req->flags |= EE_HAS_DIGEST;
2289 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2292 if (pi->cmd == P_CSUM_RS_REQUEST) {
2293 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2294 peer_req->w.cb = w_e_end_csum_rs_req;
2295 /* used in the sector offset progress display */
2296 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2297 } else if (pi->cmd == P_OV_REPLY) {
2298 /* track progress, we may need to throttle */
2299 atomic_add(size >> 9, &mdev->rs_sect_in);
2300 peer_req->w.cb = w_e_end_ov_reply;
2301 dec_rs_pending(mdev);
2302 /* drbd_rs_begin_io done when we sent this request,
2303 * but accounting still needs to be done. */
2304 goto submit_for_resync;
2309 if (mdev->ov_start_sector == ~(sector_t)0 &&
2310 mdev->tconn->agreed_pro_version >= 90) {
2311 unsigned long now = jiffies;
2313 mdev->ov_start_sector = sector;
2314 mdev->ov_position = sector;
2315 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2316 mdev->rs_total = mdev->ov_left;
2317 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2318 mdev->rs_mark_left[i] = mdev->ov_left;
2319 mdev->rs_mark_time[i] = now;
2321 dev_info(DEV, "Online Verify start sector: %llu\n",
2322 (unsigned long long)sector);
2324 peer_req->w.cb = w_e_end_ov_req;
2325 fault_type = DRBD_FAULT_RS_RD;
2332 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2333 * wrt the receiver, but it is not as straightforward as it may seem.
2334 * Various places in the resync start and stop logic assume resync
2335 * requests are processed in order, requeuing this on the worker thread
2336 * introduces a bunch of new code for synchronization between threads.
2338 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2339 * "forever", throttling after drbd_rs_begin_io will lock that extent
2340 * for application writes for the same time. For now, just throttle
2341 * here, where the rest of the code expects the receiver to sleep for
2345 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2346 * this defers syncer requests for some time, before letting at least
2347 * on request through. The resync controller on the receiving side
2348 * will adapt to the incoming rate accordingly.
2350 * We cannot throttle here if remote is Primary/SyncTarget:
2351 * we would also throttle its application reads.
2352 * In that case, throttling is done on the SyncTarget only.
2354 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2355 schedule_timeout_uninterruptible(HZ/10);
2356 if (drbd_rs_begin_io(mdev, sector))
2360 atomic_add(size >> 9, &mdev->rs_sect_ev);
2364 spin_lock_irq(&mdev->tconn->req_lock);
2365 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2366 spin_unlock_irq(&mdev->tconn->req_lock);
2368 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2371 /* don't care for the reason here */
2372 dev_err(DEV, "submit failed, triggering re-connect\n");
2373 spin_lock_irq(&mdev->tconn->req_lock);
2374 list_del(&peer_req->w.list);
2375 spin_unlock_irq(&mdev->tconn->req_lock);
2376 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2380 drbd_free_peer_req(mdev, peer_req);
2384 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2386 int self, peer, rv = -100;
2387 unsigned long ch_self, ch_peer;
2389 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2390 peer = mdev->p_uuid[UI_BITMAP] & 1;
2392 ch_peer = mdev->p_uuid[UI_SIZE];
2393 ch_self = mdev->comm_bm_set;
2395 switch (mdev->tconn->net_conf->after_sb_0p) {
2397 case ASB_DISCARD_SECONDARY:
2398 case ASB_CALL_HELPER:
2399 dev_err(DEV, "Configuration error.\n");
2401 case ASB_DISCONNECT:
2403 case ASB_DISCARD_YOUNGER_PRI:
2404 if (self == 0 && peer == 1) {
2408 if (self == 1 && peer == 0) {
2412 /* Else fall through to one of the other strategies... */
2413 case ASB_DISCARD_OLDER_PRI:
2414 if (self == 0 && peer == 1) {
2418 if (self == 1 && peer == 0) {
2422 /* Else fall through to one of the other strategies... */
2423 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2424 "Using discard-least-changes instead\n");
2425 case ASB_DISCARD_ZERO_CHG:
2426 if (ch_peer == 0 && ch_self == 0) {
2427 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2431 if (ch_peer == 0) { rv = 1; break; }
2432 if (ch_self == 0) { rv = -1; break; }
2434 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2436 case ASB_DISCARD_LEAST_CHG:
2437 if (ch_self < ch_peer)
2439 else if (ch_self > ch_peer)
2441 else /* ( ch_self == ch_peer ) */
2442 /* Well, then use something else. */
2443 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2446 case ASB_DISCARD_LOCAL:
2449 case ASB_DISCARD_REMOTE:
2456 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2460 switch (mdev->tconn->net_conf->after_sb_1p) {
2461 case ASB_DISCARD_YOUNGER_PRI:
2462 case ASB_DISCARD_OLDER_PRI:
2463 case ASB_DISCARD_LEAST_CHG:
2464 case ASB_DISCARD_LOCAL:
2465 case ASB_DISCARD_REMOTE:
2466 dev_err(DEV, "Configuration error.\n");
2468 case ASB_DISCONNECT:
2471 hg = drbd_asb_recover_0p(mdev);
2472 if (hg == -1 && mdev->state.role == R_SECONDARY)
2474 if (hg == 1 && mdev->state.role == R_PRIMARY)
2478 rv = drbd_asb_recover_0p(mdev);
2480 case ASB_DISCARD_SECONDARY:
2481 return mdev->state.role == R_PRIMARY ? 1 : -1;
2482 case ASB_CALL_HELPER:
2483 hg = drbd_asb_recover_0p(mdev);
2484 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2485 enum drbd_state_rv rv2;
2487 drbd_set_role(mdev, R_SECONDARY, 0);
2488 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2489 * we might be here in C_WF_REPORT_PARAMS which is transient.
2490 * we do not need to wait for the after state change work either. */
2491 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2492 if (rv2 != SS_SUCCESS) {
2493 drbd_khelper(mdev, "pri-lost-after-sb");
2495 dev_warn(DEV, "Successfully gave up primary role.\n");
2505 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2509 switch (mdev->tconn->net_conf->after_sb_2p) {
2510 case ASB_DISCARD_YOUNGER_PRI:
2511 case ASB_DISCARD_OLDER_PRI:
2512 case ASB_DISCARD_LEAST_CHG:
2513 case ASB_DISCARD_LOCAL:
2514 case ASB_DISCARD_REMOTE:
2516 case ASB_DISCARD_SECONDARY:
2517 dev_err(DEV, "Configuration error.\n");
2520 rv = drbd_asb_recover_0p(mdev);
2522 case ASB_DISCONNECT:
2524 case ASB_CALL_HELPER:
2525 hg = drbd_asb_recover_0p(mdev);
2527 enum drbd_state_rv rv2;
2529 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2530 * we might be here in C_WF_REPORT_PARAMS which is transient.
2531 * we do not need to wait for the after state change work either. */
2532 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2533 if (rv2 != SS_SUCCESS) {
2534 drbd_khelper(mdev, "pri-lost-after-sb");
2536 dev_warn(DEV, "Successfully gave up primary role.\n");
2546 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2547 u64 bits, u64 flags)
2550 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2553 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2555 (unsigned long long)uuid[UI_CURRENT],
2556 (unsigned long long)uuid[UI_BITMAP],
2557 (unsigned long long)uuid[UI_HISTORY_START],
2558 (unsigned long long)uuid[UI_HISTORY_END],
2559 (unsigned long long)bits,
2560 (unsigned long long)flags);
2564 100 after split brain try auto recover
2565 2 C_SYNC_SOURCE set BitMap
2566 1 C_SYNC_SOURCE use BitMap
2568 -1 C_SYNC_TARGET use BitMap
2569 -2 C_SYNC_TARGET set BitMap
2570 -100 after split brain, disconnect
2571 -1000 unrelated data
2572 -1091 requires proto 91
2573 -1096 requires proto 96
2575 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2580 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2581 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2584 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2588 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2589 peer != UUID_JUST_CREATED)
2593 if (self != UUID_JUST_CREATED &&
2594 (peer == UUID_JUST_CREATED || peer == (u64)0))
2598 int rct, dc; /* roles at crash time */
2600 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2602 if (mdev->tconn->agreed_pro_version < 91)
2605 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2606 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2607 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2608 drbd_uuid_set_bm(mdev, 0UL);
2610 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2611 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2614 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2621 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2623 if (mdev->tconn->agreed_pro_version < 91)
2626 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2627 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2628 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2630 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2631 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2632 mdev->p_uuid[UI_BITMAP] = 0UL;
2634 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2637 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2644 /* Common power [off|failure] */
2645 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2646 (mdev->p_uuid[UI_FLAGS] & 2);
2647 /* lowest bit is set when we were primary,
2648 * next bit (weight 2) is set when peer was primary */
2652 case 0: /* !self_pri && !peer_pri */ return 0;
2653 case 1: /* self_pri && !peer_pri */ return 1;
2654 case 2: /* !self_pri && peer_pri */ return -1;
2655 case 3: /* self_pri && peer_pri */
2656 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2662 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2667 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2669 if (mdev->tconn->agreed_pro_version < 96 ?
2670 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2671 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2672 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2673 /* The last P_SYNC_UUID did not get though. Undo the last start of
2674 resync as sync source modifications of the peer's UUIDs. */
2676 if (mdev->tconn->agreed_pro_version < 91)
2679 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2680 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2682 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2683 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2690 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2691 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2692 peer = mdev->p_uuid[i] & ~((u64)1);
2698 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2699 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2704 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2706 if (mdev->tconn->agreed_pro_version < 96 ?
2707 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2708 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2709 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2710 /* The last P_SYNC_UUID did not get though. Undo the last start of
2711 resync as sync source modifications of our UUIDs. */
2713 if (mdev->tconn->agreed_pro_version < 91)
2716 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2717 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2719 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2720 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2721 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2729 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2730 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2731 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2737 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2738 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2739 if (self == peer && self != ((u64)0))
2743 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2744 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2745 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2746 peer = mdev->p_uuid[j] & ~((u64)1);
2755 /* drbd_sync_handshake() returns the new conn state on success, or
2756 CONN_MASK (-1) on failure.
2758 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2759 enum drbd_disk_state peer_disk) __must_hold(local)
2762 enum drbd_conns rv = C_MASK;
2763 enum drbd_disk_state mydisk;
2765 mydisk = mdev->state.disk;
2766 if (mydisk == D_NEGOTIATING)
2767 mydisk = mdev->new_state_tmp.disk;
2769 dev_info(DEV, "drbd_sync_handshake:\n");
2770 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2771 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2772 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2774 hg = drbd_uuid_compare(mdev, &rule_nr);
2776 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2779 dev_alert(DEV, "Unrelated data, aborting!\n");
2783 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2787 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2788 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2789 int f = (hg == -100) || abs(hg) == 2;
2790 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2793 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2794 hg > 0 ? "source" : "target");
2798 drbd_khelper(mdev, "initial-split-brain");
2800 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2801 int pcount = (mdev->state.role == R_PRIMARY)
2802 + (peer_role == R_PRIMARY);
2803 int forced = (hg == -100);
2807 hg = drbd_asb_recover_0p(mdev);
2810 hg = drbd_asb_recover_1p(mdev);
2813 hg = drbd_asb_recover_2p(mdev);
2816 if (abs(hg) < 100) {
2817 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2818 "automatically solved. Sync from %s node\n",
2819 pcount, (hg < 0) ? "peer" : "this");
2821 dev_warn(DEV, "Doing a full sync, since"
2822 " UUIDs where ambiguous.\n");
2829 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2831 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2835 dev_warn(DEV, "Split-Brain detected, manually solved. "
2836 "Sync from %s node\n",
2837 (hg < 0) ? "peer" : "this");
2841 /* FIXME this log message is not correct if we end up here
2842 * after an attempted attach on a diskless node.
2843 * We just refuse to attach -- well, we drop the "connection"
2844 * to that disk, in a way... */
2845 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2846 drbd_khelper(mdev, "split-brain");
2850 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2851 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2855 if (hg < 0 && /* by intention we do not use mydisk here. */
2856 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2857 switch (mdev->tconn->net_conf->rr_conflict) {
2858 case ASB_CALL_HELPER:
2859 drbd_khelper(mdev, "pri-lost");
2861 case ASB_DISCONNECT:
2862 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2865 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2870 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2872 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2874 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2875 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2876 abs(hg) >= 2 ? "full" : "bit-map based");
2881 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2882 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2883 BM_LOCKED_SET_ALLOWED))
2887 if (hg > 0) { /* become sync source. */
2889 } else if (hg < 0) { /* become sync target */
2893 if (drbd_bm_total_weight(mdev)) {
2894 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2895 drbd_bm_total_weight(mdev));
2902 /* returns 1 if invalid */
2903 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2905 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2906 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2907 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2910 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2911 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2912 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2915 /* everything else is valid if they are equal on both sides. */
2919 /* everything es is invalid. */
2923 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2925 struct p_protocol *p = pi->data;
2926 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2927 int p_want_lose, p_two_primaries, cf;
2928 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2930 p_proto = be32_to_cpu(p->protocol);
2931 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2932 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2933 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2934 p_two_primaries = be32_to_cpu(p->two_primaries);
2935 cf = be32_to_cpu(p->conn_flags);
2936 p_want_lose = cf & CF_WANT_LOSE;
2938 clear_bit(CONN_DRY_RUN, &tconn->flags);
2940 if (cf & CF_DRY_RUN)
2941 set_bit(CONN_DRY_RUN, &tconn->flags);
2943 if (p_proto != tconn->net_conf->wire_protocol && tconn->agreed_pro_version < 100) {
2944 conn_err(tconn, "incompatible communication protocols\n");
2948 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2949 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2953 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2954 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2958 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2959 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2963 if (p_want_lose && tconn->net_conf->want_lose) {
2964 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2968 if (p_two_primaries != tconn->net_conf->two_primaries) {
2969 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2973 if (tconn->agreed_pro_version >= 87) {
2974 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2977 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2981 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2982 if (strcmp(p_integrity_alg, my_alg)) {
2983 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2986 conn_info(tconn, "data-integrity-alg: %s\n",
2987 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2993 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2998 * input: alg name, feature name
2999 * return: NULL (alg name was "")
3000 * ERR_PTR(error) if something goes wrong
3001 * or the crypto hash ptr, if it worked out ok. */
3002 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3003 const char *alg, const char *name)
3005 struct crypto_hash *tfm;
3010 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3012 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3013 alg, name, PTR_ERR(tfm));
3016 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3017 crypto_free_hash(tfm);
3018 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3019 return ERR_PTR(-EINVAL);
3024 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3026 void *buffer = tconn->data.rbuf;
3027 int size = pi->size;
3030 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3031 s = drbd_recv(tconn, buffer, s);
3045 * config_unknown_volume - device configuration command for unknown volume
3047 * When a device is added to an existing connection, the node on which the
3048 * device is added first will send configuration commands to its peer but the
3049 * peer will not know about the device yet. It will warn and ignore these
3050 * commands. Once the device is added on the second node, the second node will
3051 * send the same device configuration commands, but in the other direction.
3053 * (We can also end up here if drbd is misconfigured.)
3055 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3057 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3058 pi->vnr, cmdname(pi->cmd));
3059 return ignore_remaining_packet(tconn, pi);
3062 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3064 struct drbd_conf *mdev;
3065 struct p_rs_param_95 *p;
3066 unsigned int header_size, data_size, exp_max_sz;
3067 struct crypto_hash *verify_tfm = NULL;
3068 struct crypto_hash *csums_tfm = NULL;
3069 const int apv = tconn->agreed_pro_version;
3070 int *rs_plan_s = NULL;
3074 mdev = vnr_to_mdev(tconn, pi->vnr);
3076 return config_unknown_volume(tconn, pi);
3078 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3079 : apv == 88 ? sizeof(struct p_rs_param)
3081 : apv <= 94 ? sizeof(struct p_rs_param_89)
3082 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3084 if (pi->size > exp_max_sz) {
3085 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3086 pi->size, exp_max_sz);
3091 header_size = sizeof(struct p_rs_param);
3092 data_size = pi->size - header_size;
3093 } else if (apv <= 94) {
3094 header_size = sizeof(struct p_rs_param_89);
3095 data_size = pi->size - header_size;
3096 D_ASSERT(data_size == 0);
3098 header_size = sizeof(struct p_rs_param_95);
3099 data_size = pi->size - header_size;
3100 D_ASSERT(data_size == 0);
3103 /* initialize verify_alg and csums_alg */
3105 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3107 err = drbd_recv_all(mdev->tconn, p, header_size);
3111 if (get_ldev(mdev)) {
3112 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3118 if (data_size > SHARED_SECRET_MAX) {
3119 dev_err(DEV, "verify-alg too long, "
3120 "peer wants %u, accepting only %u byte\n",
3121 data_size, SHARED_SECRET_MAX);
3125 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3129 /* we expect NUL terminated string */
3130 /* but just in case someone tries to be evil */
3131 D_ASSERT(p->verify_alg[data_size-1] == 0);
3132 p->verify_alg[data_size-1] = 0;
3134 } else /* apv >= 89 */ {
3135 /* we still expect NUL terminated strings */
3136 /* but just in case someone tries to be evil */
3137 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3138 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3139 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3140 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3143 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3144 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3145 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3146 mdev->tconn->net_conf->verify_alg, p->verify_alg);
3149 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3150 p->verify_alg, "verify-alg");
3151 if (IS_ERR(verify_tfm)) {
3157 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3158 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3159 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3160 mdev->tconn->net_conf->csums_alg, p->csums_alg);
3163 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3164 p->csums_alg, "csums-alg");
3165 if (IS_ERR(csums_tfm)) {
3171 if (apv > 94 && get_ldev(mdev)) {
3172 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3173 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3174 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3175 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3176 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3178 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3179 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3180 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3182 dev_err(DEV, "kmalloc of fifo_buffer failed");
3190 spin_lock(&mdev->peer_seq_lock);
3191 /* lock against drbd_nl_syncer_conf() */
3193 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3194 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3195 crypto_free_hash(mdev->tconn->verify_tfm);
3196 mdev->tconn->verify_tfm = verify_tfm;
3197 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3200 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3201 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3202 crypto_free_hash(mdev->tconn->csums_tfm);
3203 mdev->tconn->csums_tfm = csums_tfm;
3204 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3206 if (fifo_size != mdev->rs_plan_s.size) {
3207 kfree(mdev->rs_plan_s.values);
3208 mdev->rs_plan_s.values = rs_plan_s;
3209 mdev->rs_plan_s.size = fifo_size;
3210 mdev->rs_planed = 0;
3212 spin_unlock(&mdev->peer_seq_lock);
3217 /* just for completeness: actually not needed,
3218 * as this is not reached if csums_tfm was ok. */
3219 crypto_free_hash(csums_tfm);
3220 /* but free the verify_tfm again, if csums_tfm did not work out */
3221 crypto_free_hash(verify_tfm);
3222 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3226 /* warn if the arguments differ by more than 12.5% */
3227 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3228 const char *s, sector_t a, sector_t b)
3231 if (a == 0 || b == 0)
3233 d = (a > b) ? (a - b) : (b - a);
3234 if (d > (a>>3) || d > (b>>3))
3235 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3236 (unsigned long long)a, (unsigned long long)b);
3239 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3241 struct drbd_conf *mdev;
3242 struct p_sizes *p = pi->data;
3243 enum determine_dev_size dd = unchanged;
3244 sector_t p_size, p_usize, my_usize;
3245 int ldsc = 0; /* local disk size changed */
3246 enum dds_flags ddsf;
3248 mdev = vnr_to_mdev(tconn, pi->vnr);
3250 return config_unknown_volume(tconn, pi);
3252 p_size = be64_to_cpu(p->d_size);
3253 p_usize = be64_to_cpu(p->u_size);
3255 /* just store the peer's disk size for now.
3256 * we still need to figure out whether we accept that. */
3257 mdev->p_size = p_size;
3259 if (get_ldev(mdev)) {
3260 warn_if_differ_considerably(mdev, "lower level device sizes",
3261 p_size, drbd_get_max_capacity(mdev->ldev));
3262 warn_if_differ_considerably(mdev, "user requested size",
3263 p_usize, mdev->ldev->dc.disk_size);
3265 /* if this is the first connect, or an otherwise expected
3266 * param exchange, choose the minimum */
3267 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3268 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3271 my_usize = mdev->ldev->dc.disk_size;
3273 if (mdev->ldev->dc.disk_size != p_usize) {
3274 mdev->ldev->dc.disk_size = p_usize;
3275 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3276 (unsigned long)mdev->ldev->dc.disk_size);
3279 /* Never shrink a device with usable data during connect.
3280 But allow online shrinking if we are connected. */
3281 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3282 drbd_get_capacity(mdev->this_bdev) &&
3283 mdev->state.disk >= D_OUTDATED &&
3284 mdev->state.conn < C_CONNECTED) {
3285 dev_err(DEV, "The peer's disk size is too small!\n");
3286 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3287 mdev->ldev->dc.disk_size = my_usize;
3294 ddsf = be16_to_cpu(p->dds_flags);
3295 if (get_ldev(mdev)) {
3296 dd = drbd_determine_dev_size(mdev, ddsf);
3298 if (dd == dev_size_error)
3302 /* I am diskless, need to accept the peer's size. */
3303 drbd_set_my_capacity(mdev, p_size);
3306 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3307 drbd_reconsider_max_bio_size(mdev);
3309 if (get_ldev(mdev)) {
3310 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3311 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3318 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3319 if (be64_to_cpu(p->c_size) !=
3320 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3321 /* we have different sizes, probably peer
3322 * needs to know my new size... */
3323 drbd_send_sizes(mdev, 0, ddsf);
3325 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3326 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3327 if (mdev->state.pdsk >= D_INCONSISTENT &&
3328 mdev->state.disk >= D_INCONSISTENT) {
3329 if (ddsf & DDSF_NO_RESYNC)
3330 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3332 resync_after_online_grow(mdev);
3334 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3341 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3343 struct drbd_conf *mdev;
3344 struct p_uuids *p = pi->data;
3346 int i, updated_uuids = 0;
3348 mdev = vnr_to_mdev(tconn, pi->vnr);
3350 return config_unknown_volume(tconn, pi);
3352 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3354 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3355 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3357 kfree(mdev->p_uuid);
3358 mdev->p_uuid = p_uuid;
3360 if (mdev->state.conn < C_CONNECTED &&
3361 mdev->state.disk < D_INCONSISTENT &&
3362 mdev->state.role == R_PRIMARY &&
3363 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3364 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3365 (unsigned long long)mdev->ed_uuid);
3366 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3370 if (get_ldev(mdev)) {
3371 int skip_initial_sync =
3372 mdev->state.conn == C_CONNECTED &&
3373 mdev->tconn->agreed_pro_version >= 90 &&
3374 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3375 (p_uuid[UI_FLAGS] & 8);
3376 if (skip_initial_sync) {
3377 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3378 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3379 "clear_n_write from receive_uuids",
3380 BM_LOCKED_TEST_ALLOWED);
3381 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3382 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3383 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3389 } else if (mdev->state.disk < D_INCONSISTENT &&
3390 mdev->state.role == R_PRIMARY) {
3391 /* I am a diskless primary, the peer just created a new current UUID
3393 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3396 /* Before we test for the disk state, we should wait until an eventually
3397 ongoing cluster wide state change is finished. That is important if
3398 we are primary and are detaching from our disk. We need to see the
3399 new disk state... */
3400 mutex_lock(mdev->state_mutex);
3401 mutex_unlock(mdev->state_mutex);
3402 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3403 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3406 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3412 * convert_state() - Converts the peer's view of the cluster state to our point of view
3413 * @ps: The state as seen by the peer.
3415 static union drbd_state convert_state(union drbd_state ps)
3417 union drbd_state ms;
3419 static enum drbd_conns c_tab[] = {
3420 [C_CONNECTED] = C_CONNECTED,
3422 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3423 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3424 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3425 [C_VERIFY_S] = C_VERIFY_T,
3431 ms.conn = c_tab[ps.conn];
3436 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3441 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3443 struct drbd_conf *mdev;
3444 struct p_req_state *p = pi->data;
3445 union drbd_state mask, val;
3446 enum drbd_state_rv rv;
3448 mdev = vnr_to_mdev(tconn, pi->vnr);
3452 mask.i = be32_to_cpu(p->mask);
3453 val.i = be32_to_cpu(p->val);
3455 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3456 mutex_is_locked(mdev->state_mutex)) {
3457 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3461 mask = convert_state(mask);
3462 val = convert_state(val);
3464 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3465 drbd_send_sr_reply(mdev, rv);
3472 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3474 struct p_req_state *p = pi->data;
3475 union drbd_state mask, val;
3476 enum drbd_state_rv rv;
3478 mask.i = be32_to_cpu(p->mask);
3479 val.i = be32_to_cpu(p->val);
3481 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3482 mutex_is_locked(&tconn->cstate_mutex)) {
3483 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3487 mask = convert_state(mask);
3488 val = convert_state(val);
3490 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3491 conn_send_sr_reply(tconn, rv);
3496 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3498 struct drbd_conf *mdev;
3499 struct p_state *p = pi->data;
3500 union drbd_state os, ns, peer_state;
3501 enum drbd_disk_state real_peer_disk;
3502 enum chg_state_flags cs_flags;
3505 mdev = vnr_to_mdev(tconn, pi->vnr);
3507 return config_unknown_volume(tconn, pi);
3509 peer_state.i = be32_to_cpu(p->state);
3511 real_peer_disk = peer_state.disk;
3512 if (peer_state.disk == D_NEGOTIATING) {
3513 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3514 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3517 spin_lock_irq(&mdev->tconn->req_lock);
3519 os = ns = drbd_read_state(mdev);
3520 spin_unlock_irq(&mdev->tconn->req_lock);
3522 /* peer says his disk is uptodate, while we think it is inconsistent,
3523 * and this happens while we think we have a sync going on. */
3524 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3525 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3526 /* If we are (becoming) SyncSource, but peer is still in sync
3527 * preparation, ignore its uptodate-ness to avoid flapping, it
3528 * will change to inconsistent once the peer reaches active
3530 * It may have changed syncer-paused flags, however, so we
3531 * cannot ignore this completely. */
3532 if (peer_state.conn > C_CONNECTED &&
3533 peer_state.conn < C_SYNC_SOURCE)
3534 real_peer_disk = D_INCONSISTENT;
3536 /* if peer_state changes to connected at the same time,
3537 * it explicitly notifies us that it finished resync.
3538 * Maybe we should finish it up, too? */
3539 else if (os.conn >= C_SYNC_SOURCE &&
3540 peer_state.conn == C_CONNECTED) {
3541 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3542 drbd_resync_finished(mdev);
3547 /* peer says his disk is inconsistent, while we think it is uptodate,
3548 * and this happens while the peer still thinks we have a sync going on,
3549 * but we think we are already done with the sync.
3550 * We ignore this to avoid flapping pdsk.
3551 * This should not happen, if the peer is a recent version of drbd. */
3552 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3553 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3554 real_peer_disk = D_UP_TO_DATE;
3556 if (ns.conn == C_WF_REPORT_PARAMS)
3557 ns.conn = C_CONNECTED;
3559 if (peer_state.conn == C_AHEAD)
3562 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3563 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3564 int cr; /* consider resync */
3566 /* if we established a new connection */
3567 cr = (os.conn < C_CONNECTED);
3568 /* if we had an established connection
3569 * and one of the nodes newly attaches a disk */
3570 cr |= (os.conn == C_CONNECTED &&
3571 (peer_state.disk == D_NEGOTIATING ||
3572 os.disk == D_NEGOTIATING));
3573 /* if we have both been inconsistent, and the peer has been
3574 * forced to be UpToDate with --overwrite-data */
3575 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3576 /* if we had been plain connected, and the admin requested to
3577 * start a sync by "invalidate" or "invalidate-remote" */
3578 cr |= (os.conn == C_CONNECTED &&
3579 (peer_state.conn >= C_STARTING_SYNC_S &&
3580 peer_state.conn <= C_WF_BITMAP_T));
3583 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3586 if (ns.conn == C_MASK) {
3587 ns.conn = C_CONNECTED;
3588 if (mdev->state.disk == D_NEGOTIATING) {
3589 drbd_force_state(mdev, NS(disk, D_FAILED));
3590 } else if (peer_state.disk == D_NEGOTIATING) {
3591 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3592 peer_state.disk = D_DISKLESS;
3593 real_peer_disk = D_DISKLESS;
3595 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3597 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3598 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3604 spin_lock_irq(&mdev->tconn->req_lock);
3605 if (os.i != drbd_read_state(mdev).i)
3607 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3608 ns.peer = peer_state.role;
3609 ns.pdsk = real_peer_disk;
3610 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3611 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3612 ns.disk = mdev->new_state_tmp.disk;
3613 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3614 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3615 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3616 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3617 for temporal network outages! */
3618 spin_unlock_irq(&mdev->tconn->req_lock);
3619 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3620 tl_clear(mdev->tconn);
3621 drbd_uuid_new_current(mdev);
3622 clear_bit(NEW_CUR_UUID, &mdev->flags);
3623 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3626 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3627 ns = drbd_read_state(mdev);
3628 spin_unlock_irq(&mdev->tconn->req_lock);
3630 if (rv < SS_SUCCESS) {
3631 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3635 if (os.conn > C_WF_REPORT_PARAMS) {
3636 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3637 peer_state.disk != D_NEGOTIATING ) {
3638 /* we want resync, peer has not yet decided to sync... */
3639 /* Nowadays only used when forcing a node into primary role and
3640 setting its disk to UpToDate with that */
3641 drbd_send_uuids(mdev);
3642 drbd_send_state(mdev);
3646 mdev->tconn->net_conf->want_lose = 0;
3648 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3653 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3655 struct drbd_conf *mdev;
3656 struct p_rs_uuid *p = pi->data;
3658 mdev = vnr_to_mdev(tconn, pi->vnr);
3662 wait_event(mdev->misc_wait,
3663 mdev->state.conn == C_WF_SYNC_UUID ||
3664 mdev->state.conn == C_BEHIND ||
3665 mdev->state.conn < C_CONNECTED ||
3666 mdev->state.disk < D_NEGOTIATING);
3668 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3670 /* Here the _drbd_uuid_ functions are right, current should
3671 _not_ be rotated into the history */
3672 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3673 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3674 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3676 drbd_print_uuids(mdev, "updated sync uuid");
3677 drbd_start_resync(mdev, C_SYNC_TARGET);
3681 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3687 * receive_bitmap_plain
3689 * Return 0 when done, 1 when another iteration is needed, and a negative error
3690 * code upon failure.
3693 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3694 unsigned long *p, struct bm_xfer_ctx *c)
3696 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3697 drbd_header_size(mdev->tconn);
3698 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3699 c->bm_words - c->word_offset);
3700 unsigned int want = num_words * sizeof(*p);
3704 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3709 err = drbd_recv_all(mdev->tconn, p, want);
3713 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3715 c->word_offset += num_words;
3716 c->bit_offset = c->word_offset * BITS_PER_LONG;
3717 if (c->bit_offset > c->bm_bits)
3718 c->bit_offset = c->bm_bits;
3723 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3725 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3728 static int dcbp_get_start(struct p_compressed_bm *p)
3730 return (p->encoding & 0x80) != 0;
3733 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3735 return (p->encoding >> 4) & 0x7;
3741 * Return 0 when done, 1 when another iteration is needed, and a negative error
3742 * code upon failure.
3745 recv_bm_rle_bits(struct drbd_conf *mdev,
3746 struct p_compressed_bm *p,
3747 struct bm_xfer_ctx *c,
3750 struct bitstream bs;
3754 unsigned long s = c->bit_offset;
3756 int toggle = dcbp_get_start(p);
3760 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3762 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3766 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3767 bits = vli_decode_bits(&rl, look_ahead);
3773 if (e >= c->bm_bits) {
3774 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3777 _drbd_bm_set_bits(mdev, s, e);
3781 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3782 have, bits, look_ahead,
3783 (unsigned int)(bs.cur.b - p->code),
3784 (unsigned int)bs.buf_len);
3787 look_ahead >>= bits;
3790 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3793 look_ahead |= tmp << have;
3798 bm_xfer_ctx_bit_to_word_offset(c);
3800 return (s != c->bm_bits);
3806 * Return 0 when done, 1 when another iteration is needed, and a negative error
3807 * code upon failure.
3810 decode_bitmap_c(struct drbd_conf *mdev,
3811 struct p_compressed_bm *p,
3812 struct bm_xfer_ctx *c,
3815 if (dcbp_get_code(p) == RLE_VLI_Bits)
3816 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3818 /* other variants had been implemented for evaluation,
3819 * but have been dropped as this one turned out to be "best"
3820 * during all our tests. */
3822 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3823 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3827 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3828 const char *direction, struct bm_xfer_ctx *c)
3830 /* what would it take to transfer it "plaintext" */
3831 unsigned int header_size = drbd_header_size(mdev->tconn);
3832 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3833 unsigned int plain =
3834 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3835 c->bm_words * sizeof(unsigned long);
3836 unsigned int total = c->bytes[0] + c->bytes[1];
3839 /* total can not be zero. but just in case: */
3843 /* don't report if not compressed */
3847 /* total < plain. check for overflow, still */
3848 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3849 : (1000 * total / plain);
3855 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3856 "total %u; compression: %u.%u%%\n",
3858 c->bytes[1], c->packets[1],
3859 c->bytes[0], c->packets[0],
3860 total, r/10, r % 10);
3863 /* Since we are processing the bitfield from lower addresses to higher,
3864 it does not matter if the process it in 32 bit chunks or 64 bit
3865 chunks as long as it is little endian. (Understand it as byte stream,
3866 beginning with the lowest byte...) If we would use big endian
3867 we would need to process it from the highest address to the lowest,
3868 in order to be agnostic to the 32 vs 64 bits issue.
3870 returns 0 on failure, 1 if we successfully received it. */
3871 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3873 struct drbd_conf *mdev;
3874 struct bm_xfer_ctx c;
3877 mdev = vnr_to_mdev(tconn, pi->vnr);
3881 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3882 /* you are supposed to send additional out-of-sync information
3883 * if you actually set bits during this phase */
3885 c = (struct bm_xfer_ctx) {
3886 .bm_bits = drbd_bm_bits(mdev),
3887 .bm_words = drbd_bm_words(mdev),
3891 if (pi->cmd == P_BITMAP)
3892 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3893 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3894 /* MAYBE: sanity check that we speak proto >= 90,
3895 * and the feature is enabled! */
3896 struct p_compressed_bm *p = pi->data;
3898 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3899 dev_err(DEV, "ReportCBitmap packet too large\n");
3903 if (pi->size <= sizeof(*p)) {
3904 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3908 err = drbd_recv_all(mdev->tconn, p, pi->size);
3911 err = decode_bitmap_c(mdev, p, &c, pi->size);
3913 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3918 c.packets[pi->cmd == P_BITMAP]++;
3919 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3926 err = drbd_recv_header(mdev->tconn, pi);
3931 INFO_bm_xfer_stats(mdev, "receive", &c);
3933 if (mdev->state.conn == C_WF_BITMAP_T) {
3934 enum drbd_state_rv rv;
3936 err = drbd_send_bitmap(mdev);
3939 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3940 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3941 D_ASSERT(rv == SS_SUCCESS);
3942 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3943 /* admin may have requested C_DISCONNECTING,
3944 * other threads may have noticed network errors */
3945 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3946 drbd_conn_str(mdev->state.conn));
3951 drbd_bm_unlock(mdev);
3952 if (!err && mdev->state.conn == C_WF_BITMAP_S)
3953 drbd_start_resync(mdev, C_SYNC_SOURCE);
3957 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3959 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3962 return ignore_remaining_packet(tconn, pi);
3965 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3967 /* Make sure we've acked all the TCP data associated
3968 * with the data requests being unplugged */
3969 drbd_tcp_quickack(tconn->data.socket);
3974 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3976 struct drbd_conf *mdev;
3977 struct p_block_desc *p = pi->data;
3979 mdev = vnr_to_mdev(tconn, pi->vnr);
3983 switch (mdev->state.conn) {
3984 case C_WF_SYNC_UUID:
3989 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3990 drbd_conn_str(mdev->state.conn));
3993 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4001 int (*fn)(struct drbd_tconn *, struct packet_info *);
4004 static struct data_cmd drbd_cmd_handler[] = {
4005 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4006 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4007 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4008 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4009 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4010 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4011 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4012 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4013 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4014 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4015 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4016 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4017 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4018 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4019 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4020 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4021 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4022 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4023 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4024 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4025 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4026 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4027 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4030 static void drbdd(struct drbd_tconn *tconn)
4032 struct packet_info pi;
4033 size_t shs; /* sub header size */
4036 while (get_t_state(&tconn->receiver) == RUNNING) {
4037 struct data_cmd *cmd;
4039 drbd_thread_current_set_cpu(&tconn->receiver);
4040 if (drbd_recv_header(tconn, &pi))
4043 cmd = &drbd_cmd_handler[pi.cmd];
4044 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4045 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4049 shs = cmd->pkt_size;
4050 if (pi.size > shs && !cmd->expect_payload) {
4051 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4056 err = drbd_recv_all_warn(tconn, pi.data, shs);
4062 err = cmd->fn(tconn, &pi);
4064 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4065 cmdname(pi.cmd), err, pi.size);
4072 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4075 void conn_flush_workqueue(struct drbd_tconn *tconn)
4077 struct drbd_wq_barrier barr;
4079 barr.w.cb = w_prev_work_done;
4080 barr.w.tconn = tconn;
4081 init_completion(&barr.done);
4082 drbd_queue_work(&tconn->data.work, &barr.w);
4083 wait_for_completion(&barr.done);
4086 static void drbd_disconnect(struct drbd_tconn *tconn)
4089 int rv = SS_UNKNOWN_ERROR;
4091 if (tconn->cstate == C_STANDALONE)
4094 /* asender does not clean up anything. it must not interfere, either */
4095 drbd_thread_stop(&tconn->asender);
4096 drbd_free_sock(tconn);
4098 down_read(&drbd_cfg_rwsem);
4099 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4100 up_read(&drbd_cfg_rwsem);
4101 conn_info(tconn, "Connection closed\n");
4103 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4104 conn_try_outdate_peer_async(tconn);
4106 spin_lock_irq(&tconn->req_lock);
4108 if (oc >= C_UNCONNECTED)
4109 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4111 spin_unlock_irq(&tconn->req_lock);
4113 if (oc == C_DISCONNECTING) {
4114 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4116 crypto_free_hash(tconn->cram_hmac_tfm);
4117 tconn->cram_hmac_tfm = NULL;
4119 kfree(tconn->net_conf);
4120 tconn->net_conf = NULL;
4121 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4125 static int drbd_disconnected(int vnr, void *p, void *data)
4127 struct drbd_conf *mdev = (struct drbd_conf *)p;
4128 enum drbd_fencing_p fp;
4131 /* wait for current activity to cease. */
4132 spin_lock_irq(&mdev->tconn->req_lock);
4133 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4134 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4135 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4136 spin_unlock_irq(&mdev->tconn->req_lock);
4138 /* We do not have data structures that would allow us to
4139 * get the rs_pending_cnt down to 0 again.
4140 * * On C_SYNC_TARGET we do not have any data structures describing
4141 * the pending RSDataRequest's we have sent.
4142 * * On C_SYNC_SOURCE there is no data structure that tracks
4143 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4144 * And no, it is not the sum of the reference counts in the
4145 * resync_LRU. The resync_LRU tracks the whole operation including
4146 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4148 drbd_rs_cancel_all(mdev);
4150 mdev->rs_failed = 0;
4151 atomic_set(&mdev->rs_pending_cnt, 0);
4152 wake_up(&mdev->misc_wait);
4154 del_timer(&mdev->request_timer);
4156 del_timer_sync(&mdev->resync_timer);
4157 resync_timer_fn((unsigned long)mdev);
4159 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4160 * w_make_resync_request etc. which may still be on the worker queue
4161 * to be "canceled" */
4162 drbd_flush_workqueue(mdev);
4164 drbd_finish_peer_reqs(mdev);
4166 kfree(mdev->p_uuid);
4167 mdev->p_uuid = NULL;
4169 if (!drbd_suspended(mdev))
4170 tl_clear(mdev->tconn);
4175 if (get_ldev(mdev)) {
4176 fp = mdev->ldev->dc.fencing;
4180 /* serialize with bitmap writeout triggered by the state change,
4182 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4184 /* tcp_close and release of sendpage pages can be deferred. I don't
4185 * want to use SO_LINGER, because apparently it can be deferred for
4186 * more than 20 seconds (longest time I checked).
4188 * Actually we don't care for exactly when the network stack does its
4189 * put_page(), but release our reference on these pages right here.
4191 i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4193 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4194 i = atomic_read(&mdev->pp_in_use_by_net);
4196 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4197 i = atomic_read(&mdev->pp_in_use);
4199 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4201 D_ASSERT(list_empty(&mdev->read_ee));
4202 D_ASSERT(list_empty(&mdev->active_ee));
4203 D_ASSERT(list_empty(&mdev->sync_ee));
4204 D_ASSERT(list_empty(&mdev->done_ee));
4206 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4207 atomic_set(&mdev->current_epoch->epoch_size, 0);
4208 D_ASSERT(list_empty(&mdev->current_epoch->list));
4214 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4215 * we can agree on is stored in agreed_pro_version.
4217 * feature flags and the reserved array should be enough room for future
4218 * enhancements of the handshake protocol, and possible plugins...
4220 * for now, they are expected to be zero, but ignored.
4222 static int drbd_send_features(struct drbd_tconn *tconn)
4224 struct drbd_socket *sock;
4225 struct p_connection_features *p;
4227 sock = &tconn->data;
4228 p = conn_prepare_command(tconn, sock);
4231 memset(p, 0, sizeof(*p));
4232 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4233 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4234 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4239 * 1 yes, we have a valid connection
4240 * 0 oops, did not work out, please try again
4241 * -1 peer talks different language,
4242 * no point in trying again, please go standalone.
4244 static int drbd_do_features(struct drbd_tconn *tconn)
4246 /* ASSERT current == tconn->receiver ... */
4247 struct p_connection_features *p;
4248 const int expect = sizeof(struct p_connection_features);
4249 struct packet_info pi;
4252 err = drbd_send_features(tconn);
4256 err = drbd_recv_header(tconn, &pi);
4260 if (pi.cmd != P_CONNECTION_FEATURES) {
4261 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4262 cmdname(pi.cmd), pi.cmd);
4266 if (pi.size != expect) {
4267 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4273 err = drbd_recv_all_warn(tconn, p, expect);
4277 p->protocol_min = be32_to_cpu(p->protocol_min);
4278 p->protocol_max = be32_to_cpu(p->protocol_max);
4279 if (p->protocol_max == 0)
4280 p->protocol_max = p->protocol_min;
4282 if (PRO_VERSION_MAX < p->protocol_min ||
4283 PRO_VERSION_MIN > p->protocol_max)
4286 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4288 conn_info(tconn, "Handshake successful: "
4289 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4294 conn_err(tconn, "incompatible DRBD dialects: "
4295 "I support %d-%d, peer supports %d-%d\n",
4296 PRO_VERSION_MIN, PRO_VERSION_MAX,
4297 p->protocol_min, p->protocol_max);
4301 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4302 static int drbd_do_auth(struct drbd_tconn *tconn)
4304 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4305 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4309 #define CHALLENGE_LEN 64
4313 0 - failed, try again (network error),
4314 -1 - auth failed, don't try again.
4317 static int drbd_do_auth(struct drbd_tconn *tconn)
4319 struct drbd_socket *sock;
4320 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4321 struct scatterlist sg;
4322 char *response = NULL;
4323 char *right_response = NULL;
4324 char *peers_ch = NULL;
4325 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4326 unsigned int resp_size;
4327 struct hash_desc desc;
4328 struct packet_info pi;
4331 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4333 desc.tfm = tconn->cram_hmac_tfm;
4336 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4337 (u8 *)tconn->net_conf->shared_secret, key_len);
4339 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4344 get_random_bytes(my_challenge, CHALLENGE_LEN);
4346 sock = &tconn->data;
4347 if (!conn_prepare_command(tconn, sock)) {
4351 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4352 my_challenge, CHALLENGE_LEN);
4356 err = drbd_recv_header(tconn, &pi);
4362 if (pi.cmd != P_AUTH_CHALLENGE) {
4363 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4364 cmdname(pi.cmd), pi.cmd);
4369 if (pi.size > CHALLENGE_LEN * 2) {
4370 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4375 peers_ch = kmalloc(pi.size, GFP_NOIO);
4376 if (peers_ch == NULL) {
4377 conn_err(tconn, "kmalloc of peers_ch failed\n");
4382 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4388 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4389 response = kmalloc(resp_size, GFP_NOIO);
4390 if (response == NULL) {
4391 conn_err(tconn, "kmalloc of response failed\n");
4396 sg_init_table(&sg, 1);
4397 sg_set_buf(&sg, peers_ch, pi.size);
4399 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4401 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4406 if (!conn_prepare_command(tconn, sock)) {
4410 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4411 response, resp_size);
4415 err = drbd_recv_header(tconn, &pi);
4421 if (pi.cmd != P_AUTH_RESPONSE) {
4422 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4423 cmdname(pi.cmd), pi.cmd);
4428 if (pi.size != resp_size) {
4429 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4434 err = drbd_recv_all_warn(tconn, response , resp_size);
4440 right_response = kmalloc(resp_size, GFP_NOIO);
4441 if (right_response == NULL) {
4442 conn_err(tconn, "kmalloc of right_response failed\n");
4447 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4449 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4451 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4456 rv = !memcmp(response, right_response, resp_size);
4459 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4460 resp_size, tconn->net_conf->cram_hmac_alg);
4467 kfree(right_response);
4473 int drbdd_init(struct drbd_thread *thi)
4475 struct drbd_tconn *tconn = thi->tconn;
4478 conn_info(tconn, "receiver (re)started\n");
4481 h = drbd_connect(tconn);
4483 drbd_disconnect(tconn);
4484 schedule_timeout_interruptible(HZ);
4487 conn_warn(tconn, "Discarding network configuration.\n");
4488 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4493 if (get_net_conf(tconn)) {
4495 put_net_conf(tconn);
4499 drbd_disconnect(tconn);
4501 conn_info(tconn, "receiver terminated\n");
4505 /* ********* acknowledge sender ******** */
4507 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4509 struct p_req_state_reply *p = pi->data;
4510 int retcode = be32_to_cpu(p->retcode);
4512 if (retcode >= SS_SUCCESS) {
4513 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4515 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4516 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4517 drbd_set_st_err_str(retcode), retcode);
4519 wake_up(&tconn->ping_wait);
4524 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4526 struct drbd_conf *mdev;
4527 struct p_req_state_reply *p = pi->data;
4528 int retcode = be32_to_cpu(p->retcode);
4530 mdev = vnr_to_mdev(tconn, pi->vnr);
4534 if (retcode >= SS_SUCCESS) {
4535 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4537 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4538 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4539 drbd_set_st_err_str(retcode), retcode);
4541 wake_up(&mdev->state_wait);
4546 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4548 return drbd_send_ping_ack(tconn);
4552 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4554 /* restore idle timeout */
4555 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4556 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4557 wake_up(&tconn->ping_wait);
4562 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4564 struct drbd_conf *mdev;
4565 struct p_block_ack *p = pi->data;
4566 sector_t sector = be64_to_cpu(p->sector);
4567 int blksize = be32_to_cpu(p->blksize);
4569 mdev = vnr_to_mdev(tconn, pi->vnr);
4573 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4575 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4577 if (get_ldev(mdev)) {
4578 drbd_rs_complete_io(mdev, sector);
4579 drbd_set_in_sync(mdev, sector, blksize);
4580 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4581 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4584 dec_rs_pending(mdev);
4585 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4591 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4592 struct rb_root *root, const char *func,
4593 enum drbd_req_event what, bool missing_ok)
4595 struct drbd_request *req;
4596 struct bio_and_error m;
4598 spin_lock_irq(&mdev->tconn->req_lock);
4599 req = find_request(mdev, root, id, sector, missing_ok, func);
4600 if (unlikely(!req)) {
4601 spin_unlock_irq(&mdev->tconn->req_lock);
4604 __req_mod(req, what, &m);
4605 spin_unlock_irq(&mdev->tconn->req_lock);
4608 complete_master_bio(mdev, &m);
4612 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4614 struct drbd_conf *mdev;
4615 struct p_block_ack *p = pi->data;
4616 sector_t sector = be64_to_cpu(p->sector);
4617 int blksize = be32_to_cpu(p->blksize);
4618 enum drbd_req_event what;
4620 mdev = vnr_to_mdev(tconn, pi->vnr);
4624 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4626 if (p->block_id == ID_SYNCER) {
4627 drbd_set_in_sync(mdev, sector, blksize);
4628 dec_rs_pending(mdev);
4632 case P_RS_WRITE_ACK:
4633 what = WRITE_ACKED_BY_PEER_AND_SIS;
4636 what = WRITE_ACKED_BY_PEER;
4639 what = RECV_ACKED_BY_PEER;
4641 case P_DISCARD_WRITE:
4642 what = DISCARD_WRITE;
4645 what = POSTPONE_WRITE;
4651 return validate_req_change_req_state(mdev, p->block_id, sector,
4652 &mdev->write_requests, __func__,
4656 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4658 struct drbd_conf *mdev;
4659 struct p_block_ack *p = pi->data;
4660 sector_t sector = be64_to_cpu(p->sector);
4661 int size = be32_to_cpu(p->blksize);
4664 mdev = vnr_to_mdev(tconn, pi->vnr);
4668 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4670 if (p->block_id == ID_SYNCER) {
4671 dec_rs_pending(mdev);
4672 drbd_rs_failed_io(mdev, sector, size);
4676 err = validate_req_change_req_state(mdev, p->block_id, sector,
4677 &mdev->write_requests, __func__,
4680 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4681 The master bio might already be completed, therefore the
4682 request is no longer in the collision hash. */
4683 /* In Protocol B we might already have got a P_RECV_ACK
4684 but then get a P_NEG_ACK afterwards. */
4685 drbd_set_out_of_sync(mdev, sector, size);
4690 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4692 struct drbd_conf *mdev;
4693 struct p_block_ack *p = pi->data;
4694 sector_t sector = be64_to_cpu(p->sector);
4696 mdev = vnr_to_mdev(tconn, pi->vnr);
4700 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4702 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4703 (unsigned long long)sector, be32_to_cpu(p->blksize));
4705 return validate_req_change_req_state(mdev, p->block_id, sector,
4706 &mdev->read_requests, __func__,
4710 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4712 struct drbd_conf *mdev;
4715 struct p_block_ack *p = pi->data;
4717 mdev = vnr_to_mdev(tconn, pi->vnr);
4721 sector = be64_to_cpu(p->sector);
4722 size = be32_to_cpu(p->blksize);
4724 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4726 dec_rs_pending(mdev);
4728 if (get_ldev_if_state(mdev, D_FAILED)) {
4729 drbd_rs_complete_io(mdev, sector);
4731 case P_NEG_RS_DREPLY:
4732 drbd_rs_failed_io(mdev, sector, size);
4744 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4746 struct drbd_conf *mdev;
4747 struct p_barrier_ack *p = pi->data;
4749 mdev = vnr_to_mdev(tconn, pi->vnr);
4753 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4755 if (mdev->state.conn == C_AHEAD &&
4756 atomic_read(&mdev->ap_in_flight) == 0 &&
4757 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4758 mdev->start_resync_timer.expires = jiffies + HZ;
4759 add_timer(&mdev->start_resync_timer);
4765 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4767 struct drbd_conf *mdev;
4768 struct p_block_ack *p = pi->data;
4769 struct drbd_work *w;
4773 mdev = vnr_to_mdev(tconn, pi->vnr);
4777 sector = be64_to_cpu(p->sector);
4778 size = be32_to_cpu(p->blksize);
4780 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4782 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4783 drbd_ov_out_of_sync_found(mdev, sector, size);
4785 ov_out_of_sync_print(mdev);
4787 if (!get_ldev(mdev))
4790 drbd_rs_complete_io(mdev, sector);
4791 dec_rs_pending(mdev);
4795 /* let's advance progress step marks only for every other megabyte */
4796 if ((mdev->ov_left & 0x200) == 0x200)
4797 drbd_advance_rs_marks(mdev, mdev->ov_left);
4799 if (mdev->ov_left == 0) {
4800 w = kmalloc(sizeof(*w), GFP_NOIO);
4802 w->cb = w_ov_finished;
4804 drbd_queue_work_front(&mdev->tconn->data.work, w);
4806 dev_err(DEV, "kmalloc(w) failed.");
4807 ov_out_of_sync_print(mdev);
4808 drbd_resync_finished(mdev);
4815 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4820 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4822 struct drbd_conf *mdev;
4823 int i, not_empty = 0;
4826 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4827 flush_signals(current);
4828 down_read(&drbd_cfg_rwsem);
4829 idr_for_each_entry(&tconn->volumes, mdev, i) {
4830 if (drbd_finish_peer_reqs(mdev)) {
4831 up_read(&drbd_cfg_rwsem);
4832 return 1; /* error */
4835 up_read(&drbd_cfg_rwsem);
4836 set_bit(SIGNAL_ASENDER, &tconn->flags);
4838 spin_lock_irq(&tconn->req_lock);
4840 idr_for_each_entry(&tconn->volumes, mdev, i) {
4841 not_empty = !list_empty(&mdev->done_ee);
4846 spin_unlock_irq(&tconn->req_lock);
4847 } while (not_empty);
4852 struct asender_cmd {
4854 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4857 static struct asender_cmd asender_tbl[] = {
4858 [P_PING] = { 0, got_Ping },
4859 [P_PING_ACK] = { 0, got_PingAck },
4860 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4861 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4862 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4863 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4864 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4865 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4866 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
4867 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4868 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4869 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4870 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4871 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4872 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
4873 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4874 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4877 int drbd_asender(struct drbd_thread *thi)
4879 struct drbd_tconn *tconn = thi->tconn;
4880 struct asender_cmd *cmd = NULL;
4881 struct packet_info pi;
4883 void *buf = tconn->meta.rbuf;
4885 unsigned int header_size = drbd_header_size(tconn);
4886 int expect = header_size;
4887 int ping_timeout_active = 0;
4889 current->policy = SCHED_RR; /* Make this a realtime task! */
4890 current->rt_priority = 2; /* more important than all other tasks */
4892 while (get_t_state(thi) == RUNNING) {
4893 drbd_thread_current_set_cpu(thi);
4894 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4895 if (drbd_send_ping(tconn)) {
4896 conn_err(tconn, "drbd_send_ping has failed\n");
4899 tconn->meta.socket->sk->sk_rcvtimeo =
4900 tconn->net_conf->ping_timeo*HZ/10;
4901 ping_timeout_active = 1;
4904 /* TODO: conditionally cork; it may hurt latency if we cork without
4906 if (!tconn->net_conf->no_cork)
4907 drbd_tcp_cork(tconn->meta.socket);
4908 if (tconn_finish_peer_reqs(tconn)) {
4909 conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
4912 /* but unconditionally uncork unless disabled */
4913 if (!tconn->net_conf->no_cork)
4914 drbd_tcp_uncork(tconn->meta.socket);
4916 /* short circuit, recv_msg would return EINTR anyways. */
4917 if (signal_pending(current))
4920 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4921 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4923 flush_signals(current);
4926 * -EINTR (on meta) we got a signal
4927 * -EAGAIN (on meta) rcvtimeo expired
4928 * -ECONNRESET other side closed the connection
4929 * -ERESTARTSYS (on data) we got a signal
4930 * rv < 0 other than above: unexpected error!
4931 * rv == expected: full header or command
4932 * rv < expected: "woken" by signal during receive
4933 * rv == 0 : "connection shut down by peer"
4935 if (likely(rv > 0)) {
4938 } else if (rv == 0) {
4939 conn_err(tconn, "meta connection shut down by peer.\n");
4941 } else if (rv == -EAGAIN) {
4942 /* If the data socket received something meanwhile,
4943 * that is good enough: peer is still alive. */
4944 if (time_after(tconn->last_received,
4945 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4947 if (ping_timeout_active) {
4948 conn_err(tconn, "PingAck did not arrive in time.\n");
4951 set_bit(SEND_PING, &tconn->flags);
4953 } else if (rv == -EINTR) {
4956 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4960 if (received == expect && cmd == NULL) {
4961 if (decode_header(tconn, tconn->meta.rbuf, &pi))
4963 cmd = &asender_tbl[pi.cmd];
4964 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4965 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4969 expect = header_size + cmd->pkt_size;
4970 if (pi.size != expect - header_size) {
4971 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4976 if (received == expect) {
4979 err = cmd->fn(tconn, &pi);
4981 conn_err(tconn, "%pf failed\n", cmd->fn);
4985 tconn->last_received = jiffies;
4987 /* the idle_timeout (ping-int)
4988 * has been restored in got_PingAck() */
4989 if (cmd == &asender_tbl[P_PING_ACK])
4990 ping_timeout_active = 0;
4992 buf = tconn->meta.rbuf;
4994 expect = header_size;
5001 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5005 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5007 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5009 conn_info(tconn, "asender terminated\n");