4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
79 /* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
83 static struct page *page_chain_del(struct page **head, int n)
97 tmp = page_chain_next(page);
99 break; /* found sufficient pages */
101 /* insufficient pages, don't use any of them. */
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
114 /* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
121 while ((tmp = page_chain_next(page)))
128 static int page_chain_free(struct page *page)
132 page_chain_for_each_safe(page, tmp) {
139 static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
156 struct page *page = NULL;
157 struct page *tmp = NULL;
160 /* Yes, testing drbd_pp_vacant outside the lock is racy.
161 * So what. It saves a spin_lock. */
162 if (drbd_pp_vacant >= number) {
163 spin_lock(&drbd_pp_lock);
164 page = page_chain_del(&drbd_pp_pool, number);
166 drbd_pp_vacant -= number;
167 spin_unlock(&drbd_pp_lock);
172 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173 * "criss-cross" setup, that might cause write-out on some other DRBD,
174 * which in turn might block on the other node at this very place. */
175 for (i = 0; i < number; i++) {
176 tmp = alloc_page(GFP_TRY);
179 set_page_private(tmp, (unsigned long)page);
186 /* Not enough pages immediately available this time.
187 * No need to jump around here, drbd_alloc_pages will retry this
188 * function "soon". */
190 tmp = page_chain_tail(page, NULL);
191 spin_lock(&drbd_pp_lock);
192 page_chain_add(&drbd_pp_pool, page, tmp);
194 spin_unlock(&drbd_pp_lock);
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200 struct list_head *to_be_freed)
202 struct drbd_peer_request *peer_req;
203 struct list_head *le, *tle;
205 /* The EEs are always appended to the end of the list. Since
206 they are sent in order over the wire, they have to finish
207 in order. As soon as we see the first not finished we can
208 stop to examine the list... */
210 list_for_each_safe(le, tle, &mdev->net_ee) {
211 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212 if (drbd_peer_req_has_active_page(peer_req))
214 list_move(le, to_be_freed);
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
223 spin_lock_irq(&mdev->tconn->req_lock);
224 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225 spin_unlock_irq(&mdev->tconn->req_lock);
227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 drbd_free_net_peer_req(mdev, peer_req);
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @mdev: DRBD device.
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel, unless this allocation would exceed the max_buffers setting.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
241 * Returns a page chain linked via page->private.
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
246 struct page *page = NULL;
249 /* Yes, we may run up to @number over max_buffers. If we
250 * follow it strictly, the admin will get it wrong anyways. */
251 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
252 page = __drbd_alloc_pages(mdev, number);
254 while (page == NULL) {
255 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257 drbd_kick_lo_and_reclaim_net(mdev);
259 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
260 page = __drbd_alloc_pages(mdev, number);
268 if (signal_pending(current)) {
269 dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
275 finish_wait(&drbd_pp_wait, &wait);
278 atomic_add(number, &mdev->pp_in_use);
282 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
283 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
284 * Either links the page chain back to the global pool,
285 * or returns all pages to the system. */
286 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
288 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
291 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
292 i = page_chain_free(page);
295 tmp = page_chain_tail(page, &i);
296 spin_lock(&drbd_pp_lock);
297 page_chain_add(&drbd_pp_pool, page, tmp);
299 spin_unlock(&drbd_pp_lock);
301 i = atomic_sub_return(i, a);
303 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
304 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
305 wake_up(&drbd_pp_wait);
309 You need to hold the req_lock:
310 _drbd_wait_ee_list_empty()
312 You must not have the req_lock:
314 drbd_alloc_peer_req()
315 drbd_free_peer_reqs()
317 drbd_finish_peer_reqs()
319 drbd_wait_ee_list_empty()
322 struct drbd_peer_request *
323 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
324 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
326 struct drbd_peer_request *peer_req;
328 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
330 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
333 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
335 if (!(gfp_mask & __GFP_NOWARN))
336 dev_err(DEV, "%s: allocation failed\n", __func__);
340 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
344 drbd_clear_interval(&peer_req->i);
345 peer_req->i.size = data_size;
346 peer_req->i.sector = sector;
347 peer_req->i.local = false;
348 peer_req->i.waiting = false;
350 peer_req->epoch = NULL;
351 peer_req->w.mdev = mdev;
352 peer_req->pages = page;
353 atomic_set(&peer_req->pending_bios, 0);
356 * The block_id is opaque to the receiver. It is not endianness
357 * converted, and sent back to the sender unchanged.
359 peer_req->block_id = id;
364 mempool_free(peer_req, drbd_ee_mempool);
368 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
371 if (peer_req->flags & EE_HAS_DIGEST)
372 kfree(peer_req->digest);
373 drbd_free_pages(mdev, peer_req->pages, is_net);
374 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
375 D_ASSERT(drbd_interval_empty(&peer_req->i));
376 mempool_free(peer_req, drbd_ee_mempool);
379 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
381 LIST_HEAD(work_list);
382 struct drbd_peer_request *peer_req, *t;
384 int is_net = list == &mdev->net_ee;
386 spin_lock_irq(&mdev->tconn->req_lock);
387 list_splice_init(list, &work_list);
388 spin_unlock_irq(&mdev->tconn->req_lock);
390 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
391 __drbd_free_peer_req(mdev, peer_req, is_net);
398 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
400 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
402 LIST_HEAD(work_list);
403 LIST_HEAD(reclaimed);
404 struct drbd_peer_request *peer_req, *t;
407 spin_lock_irq(&mdev->tconn->req_lock);
408 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
409 list_splice_init(&mdev->done_ee, &work_list);
410 spin_unlock_irq(&mdev->tconn->req_lock);
412 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
413 drbd_free_net_peer_req(mdev, peer_req);
415 /* possible callbacks here:
416 * e_end_block, and e_end_resync_block, e_send_discard_write.
417 * all ignore the last argument.
419 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
422 /* list_del not necessary, next/prev members not touched */
423 err2 = peer_req->w.cb(&peer_req->w, !!err);
426 drbd_free_peer_req(mdev, peer_req);
428 wake_up(&mdev->ee_wait);
433 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
434 struct list_head *head)
438 /* avoids spin_lock/unlock
439 * and calling prepare_to_wait in the fast path */
440 while (!list_empty(head)) {
441 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
442 spin_unlock_irq(&mdev->tconn->req_lock);
444 finish_wait(&mdev->ee_wait, &wait);
445 spin_lock_irq(&mdev->tconn->req_lock);
449 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
450 struct list_head *head)
452 spin_lock_irq(&mdev->tconn->req_lock);
453 _drbd_wait_ee_list_empty(mdev, head);
454 spin_unlock_irq(&mdev->tconn->req_lock);
457 /* see also kernel_accept; which is only present since 2.6.18.
458 * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
461 struct sock *sk = sock->sk;
465 err = sock->ops->listen(sock, 5);
469 *what = "sock_create_lite";
470 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 err = sock->ops->accept(sock, *newsock, 0);
478 sock_release(*newsock);
482 (*newsock)->ops = sock->ops;
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
495 struct msghdr msg = {
497 .msg_iov = (struct iovec *)&iov,
498 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
517 struct msghdr msg = {
519 .msg_iov = (struct iovec *)&iov,
520 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
533 * ECONNRESET other side closed the connection
534 * ERESTARTSYS (on sock) we got a signal
538 if (rv == -ECONNRESET)
539 conn_info(tconn, "sock was reset by peer\n");
540 else if (rv != -ERESTARTSYS)
541 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
543 } else if (rv == 0) {
544 conn_info(tconn, "sock was shut down by peer\n");
547 /* signal came in, or peer/link went down,
548 * after we read a partial message
550 /* D_ASSERT(signal_pending(current)); */
558 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
567 err = drbd_recv(tconn, buf, size);
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
580 err = drbd_recv_all(tconn, buf, size);
581 if (err && !signal_pending(current))
582 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
587 * On individual connections, the socket buffer size must be set prior to the
588 * listen(2) or connect(2) calls in order to have it take effect.
589 * This is our wrapper to do so.
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
594 /* open coded SO_SNDBUF, SO_RCVBUF */
596 sock->sk->sk_sndbuf = snd;
597 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
600 sock->sk->sk_rcvbuf = rcv;
601 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
609 struct sockaddr_in6 src_in6;
611 int disconnect_on_error = 1;
613 if (!get_net_conf(tconn))
616 what = "sock_create_kern";
617 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618 SOCK_STREAM, IPPROTO_TCP, &sock);
624 sock->sk->sk_rcvtimeo =
625 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
626 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627 tconn->net_conf->rcvbuf_size);
629 /* explicitly bind to the configured IP as source IP
630 * for the outgoing connections.
631 * This is needed for multihomed hosts and to be
632 * able to use lo: interfaces for drbd.
633 * Make sure to use 0 as port number, so linux selects
634 * a free one dynamically.
636 memcpy(&src_in6, tconn->net_conf->my_addr,
637 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639 src_in6.sin6_port = 0;
641 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
643 what = "bind before connect";
644 err = sock->ops->bind(sock,
645 (struct sockaddr *) &src_in6,
646 tconn->net_conf->my_addr_len);
650 /* connect may fail, peer not yet available.
651 * stay C_WF_CONNECTION, don't go Disconnecting! */
652 disconnect_on_error = 0;
654 err = sock->ops->connect(sock,
655 (struct sockaddr *)tconn->net_conf->peer_addr,
656 tconn->net_conf->peer_addr_len, 0);
665 /* timeout, busy, signal pending */
666 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667 case EINTR: case ERESTARTSYS:
668 /* peer not (yet) available, network problem */
669 case ECONNREFUSED: case ENETUNREACH:
670 case EHOSTDOWN: case EHOSTUNREACH:
671 disconnect_on_error = 0;
674 conn_err(tconn, "%s failed, err = %d\n", what, err);
676 if (disconnect_on_error)
677 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
686 struct socket *s_estab = NULL, *s_listen;
689 if (!get_net_conf(tconn))
692 what = "sock_create_kern";
693 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694 SOCK_STREAM, IPPROTO_TCP, &s_listen);
700 timeo = tconn->net_conf->try_connect_int * HZ;
701 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
703 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
704 s_listen->sk->sk_rcvtimeo = timeo;
705 s_listen->sk->sk_sndtimeo = timeo;
706 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707 tconn->net_conf->rcvbuf_size);
709 what = "bind before listen";
710 err = s_listen->ops->bind(s_listen,
711 (struct sockaddr *) tconn->net_conf->my_addr,
712 tconn->net_conf->my_addr_len);
716 err = drbd_accept(&what, s_listen, &s_estab);
720 sock_release(s_listen);
722 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723 conn_err(tconn, "%s failed, err = %d\n", what, err);
724 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
732 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
734 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
735 enum drbd_packet cmd)
737 if (!conn_prepare_command(tconn, sock))
739 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
742 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
744 unsigned int header_size = drbd_header_size(tconn);
745 struct packet_info pi;
748 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
749 if (err != header_size) {
754 err = decode_header(tconn, tconn->data.rbuf, &pi);
761 * drbd_socket_okay() - Free the socket if its connection is not okay
762 * @sock: pointer to the pointer to the socket.
764 static int drbd_socket_okay(struct socket **sock)
772 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
774 if (rr > 0 || rr == -EAGAIN) {
782 /* Gets called if a connection is established, or if a new minor gets created
784 int drbd_connected(int vnr, void *p, void *data)
786 struct drbd_conf *mdev = (struct drbd_conf *)p;
789 atomic_set(&mdev->packet_seq, 0);
792 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
793 &mdev->tconn->cstate_mutex :
794 &mdev->own_state_mutex;
796 err = drbd_send_sync_param(mdev);
798 err = drbd_send_sizes(mdev, 0, 0);
800 err = drbd_send_uuids(mdev);
802 err = drbd_send_state(mdev);
803 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
804 clear_bit(RESIZE_PENDING, &mdev->flags);
805 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
811 * 1 yes, we have a valid connection
812 * 0 oops, did not work out, please try again
813 * -1 peer talks different language,
814 * no point in trying again, please go standalone.
815 * -2 We do not have a network config...
817 static int drbd_connect(struct drbd_tconn *tconn)
819 struct socket *sock, *msock;
822 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
825 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
827 /* Assume that the peer only understands protocol 80 until we know better. */
828 tconn->agreed_pro_version = 80;
834 /* 3 tries, this should take less than a second! */
835 s = drbd_try_connect(tconn);
838 /* give the other side time to call bind() & listen() */
839 schedule_timeout_interruptible(HZ / 10);
843 if (!tconn->data.socket) {
844 tconn->data.socket = s;
845 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
846 } else if (!tconn->meta.socket) {
847 tconn->meta.socket = s;
848 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
850 conn_err(tconn, "Logic error in drbd_connect()\n");
851 goto out_release_sockets;
855 if (tconn->data.socket && tconn->meta.socket) {
856 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
857 ok = drbd_socket_okay(&tconn->data.socket);
858 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
864 s = drbd_wait_for_connect(tconn);
866 try = receive_first_packet(tconn, s);
867 drbd_socket_okay(&tconn->data.socket);
868 drbd_socket_okay(&tconn->meta.socket);
871 if (tconn->data.socket) {
872 conn_warn(tconn, "initial packet S crossed\n");
873 sock_release(tconn->data.socket);
875 tconn->data.socket = s;
878 if (tconn->meta.socket) {
879 conn_warn(tconn, "initial packet M crossed\n");
880 sock_release(tconn->meta.socket);
882 tconn->meta.socket = s;
883 set_bit(DISCARD_CONCURRENT, &tconn->flags);
886 conn_warn(tconn, "Error receiving initial packet\n");
893 if (tconn->cstate <= C_DISCONNECTING)
894 goto out_release_sockets;
895 if (signal_pending(current)) {
896 flush_signals(current);
898 if (get_t_state(&tconn->receiver) == EXITING)
899 goto out_release_sockets;
902 if (tconn->data.socket && &tconn->meta.socket) {
903 ok = drbd_socket_okay(&tconn->data.socket);
904 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
910 sock = tconn->data.socket;
911 msock = tconn->meta.socket;
913 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
914 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
916 sock->sk->sk_allocation = GFP_NOIO;
917 msock->sk->sk_allocation = GFP_NOIO;
919 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
920 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
923 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
924 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925 * first set it to the P_CONNECTION_FEATURES timeout,
926 * which we set to 4x the configured ping_timeout. */
927 sock->sk->sk_sndtimeo =
928 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
930 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
931 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
933 /* we don't want delays.
934 * we use TCP_CORK where appropriate, though */
935 drbd_tcp_nodelay(sock);
936 drbd_tcp_nodelay(msock);
938 tconn->last_received = jiffies;
940 h = drbd_do_features(tconn);
944 if (tconn->cram_hmac_tfm) {
945 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
946 switch (drbd_do_auth(tconn)) {
948 conn_err(tconn, "Authentication of peer failed\n");
951 conn_err(tconn, "Authentication of peer failed, trying again.\n");
956 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
959 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
960 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
962 drbd_thread_start(&tconn->asender);
964 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
967 down_read(&drbd_cfg_rwsem);
968 h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
969 up_read(&drbd_cfg_rwsem);
973 if (tconn->data.socket) {
974 sock_release(tconn->data.socket);
975 tconn->data.socket = NULL;
977 if (tconn->meta.socket) {
978 sock_release(tconn->meta.socket);
979 tconn->meta.socket = NULL;
984 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
986 unsigned int header_size = drbd_header_size(tconn);
988 if (header_size == sizeof(struct p_header100) &&
989 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
990 struct p_header100 *h = header;
992 conn_err(tconn, "Header padding is not zero\n");
995 pi->vnr = be16_to_cpu(h->volume);
996 pi->cmd = be16_to_cpu(h->command);
997 pi->size = be32_to_cpu(h->length);
998 } else if (header_size == sizeof(struct p_header95) &&
999 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1000 struct p_header95 *h = header;
1001 pi->cmd = be16_to_cpu(h->command);
1002 pi->size = be32_to_cpu(h->length);
1004 } else if (header_size == sizeof(struct p_header80) &&
1005 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1006 struct p_header80 *h = header;
1007 pi->cmd = be16_to_cpu(h->command);
1008 pi->size = be16_to_cpu(h->length);
1011 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1012 be32_to_cpu(*(__be32 *)header),
1013 tconn->agreed_pro_version);
1016 pi->data = header + header_size;
1020 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1022 void *buffer = tconn->data.rbuf;
1025 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1029 err = decode_header(tconn, buffer, pi);
1030 tconn->last_received = jiffies;
1035 static void drbd_flush(struct drbd_conf *mdev)
1039 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1040 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1043 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1044 /* would rather check on EOPNOTSUPP, but that is not reliable.
1045 * don't try again for ANY return value != 0
1046 * if (rv == -EOPNOTSUPP) */
1047 drbd_bump_write_ordering(mdev, WO_drain_io);
1054 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1055 * @mdev: DRBD device.
1056 * @epoch: Epoch object.
1059 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1060 struct drbd_epoch *epoch,
1061 enum epoch_event ev)
1064 struct drbd_epoch *next_epoch;
1065 enum finish_epoch rv = FE_STILL_LIVE;
1067 spin_lock(&mdev->epoch_lock);
1071 epoch_size = atomic_read(&epoch->epoch_size);
1073 switch (ev & ~EV_CLEANUP) {
1075 atomic_dec(&epoch->active);
1077 case EV_GOT_BARRIER_NR:
1078 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1080 case EV_BECAME_LAST:
1085 if (epoch_size != 0 &&
1086 atomic_read(&epoch->active) == 0 &&
1087 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1088 if (!(ev & EV_CLEANUP)) {
1089 spin_unlock(&mdev->epoch_lock);
1090 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1091 spin_lock(&mdev->epoch_lock);
1095 if (mdev->current_epoch != epoch) {
1096 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1097 list_del(&epoch->list);
1098 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1102 if (rv == FE_STILL_LIVE)
1106 atomic_set(&epoch->epoch_size, 0);
1107 /* atomic_set(&epoch->active, 0); is already zero */
1108 if (rv == FE_STILL_LIVE)
1110 wake_up(&mdev->ee_wait);
1120 spin_unlock(&mdev->epoch_lock);
1126 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1127 * @mdev: DRBD device.
1128 * @wo: Write ordering method to try.
1130 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1132 enum write_ordering_e pwo;
1133 static char *write_ordering_str[] = {
1135 [WO_drain_io] = "drain",
1136 [WO_bdev_flush] = "flush",
1139 pwo = mdev->write_ordering;
1141 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1143 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1145 mdev->write_ordering = wo;
1146 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1147 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1151 * drbd_submit_peer_request()
1152 * @mdev: DRBD device.
1153 * @peer_req: peer request
1154 * @rw: flag field, see bio->bi_rw
1156 * May spread the pages to multiple bios,
1157 * depending on bio_add_page restrictions.
1159 * Returns 0 if all bios have been submitted,
1160 * -ENOMEM if we could not allocate enough bios,
1161 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1162 * single page to an empty bio (which should never happen and likely indicates
1163 * that the lower level IO stack is in some way broken). This has been observed
1164 * on certain Xen deployments.
1166 /* TODO allocate from our own bio_set. */
1167 int drbd_submit_peer_request(struct drbd_conf *mdev,
1168 struct drbd_peer_request *peer_req,
1169 const unsigned rw, const int fault_type)
1171 struct bio *bios = NULL;
1173 struct page *page = peer_req->pages;
1174 sector_t sector = peer_req->i.sector;
1175 unsigned ds = peer_req->i.size;
1176 unsigned n_bios = 0;
1177 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1180 /* In most cases, we will only need one bio. But in case the lower
1181 * level restrictions happen to be different at this offset on this
1182 * side than those of the sending peer, we may need to submit the
1183 * request in more than one bio.
1185 * Plain bio_alloc is good enough here, this is no DRBD internally
1186 * generated bio, but a bio allocated on behalf of the peer.
1189 bio = bio_alloc(GFP_NOIO, nr_pages);
1191 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1194 /* > peer_req->i.sector, unless this is the first bio */
1195 bio->bi_sector = sector;
1196 bio->bi_bdev = mdev->ldev->backing_bdev;
1198 bio->bi_private = peer_req;
1199 bio->bi_end_io = drbd_peer_request_endio;
1201 bio->bi_next = bios;
1205 page_chain_for_each(page) {
1206 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1207 if (!bio_add_page(bio, page, len, 0)) {
1208 /* A single page must always be possible!
1209 * But in case it fails anyways,
1210 * we deal with it, and complain (below). */
1211 if (bio->bi_vcnt == 0) {
1213 "bio_add_page failed for len=%u, "
1214 "bi_vcnt=0 (bi_sector=%llu)\n",
1215 len, (unsigned long long)bio->bi_sector);
1225 D_ASSERT(page == NULL);
1228 atomic_set(&peer_req->pending_bios, n_bios);
1231 bios = bios->bi_next;
1232 bio->bi_next = NULL;
1234 drbd_generic_make_request(mdev, fault_type, bio);
1241 bios = bios->bi_next;
1247 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1248 struct drbd_peer_request *peer_req)
1250 struct drbd_interval *i = &peer_req->i;
1252 drbd_remove_interval(&mdev->write_requests, i);
1253 drbd_clear_interval(i);
1255 /* Wake up any processes waiting for this peer request to complete. */
1257 wake_up(&mdev->misc_wait);
1260 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1262 struct drbd_conf *mdev;
1264 struct p_barrier *p = pi->data;
1265 struct drbd_epoch *epoch;
1267 mdev = vnr_to_mdev(tconn, pi->vnr);
1273 mdev->current_epoch->barrier_nr = p->barrier;
1274 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1276 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1277 * the activity log, which means it would not be resynced in case the
1278 * R_PRIMARY crashes now.
1279 * Therefore we must send the barrier_ack after the barrier request was
1281 switch (mdev->write_ordering) {
1283 if (rv == FE_RECYCLED)
1286 /* receiver context, in the writeout path of the other node.
1287 * avoid potential distributed deadlock */
1288 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1292 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1297 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1300 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1301 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1306 epoch = mdev->current_epoch;
1307 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1309 D_ASSERT(atomic_read(&epoch->active) == 0);
1310 D_ASSERT(epoch->flags == 0);
1314 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1319 atomic_set(&epoch->epoch_size, 0);
1320 atomic_set(&epoch->active, 0);
1322 spin_lock(&mdev->epoch_lock);
1323 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1324 list_add(&epoch->list, &mdev->current_epoch->list);
1325 mdev->current_epoch = epoch;
1328 /* The current_epoch got recycled while we allocated this one... */
1331 spin_unlock(&mdev->epoch_lock);
1336 /* used from receive_RSDataReply (recv_resync_read)
1337 * and from receive_Data */
1338 static struct drbd_peer_request *
1339 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1340 int data_size) __must_hold(local)
1342 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1343 struct drbd_peer_request *peer_req;
1346 void *dig_in = mdev->tconn->int_dig_in;
1347 void *dig_vv = mdev->tconn->int_dig_vv;
1348 unsigned long *data;
1350 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1351 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1355 * FIXME: Receive the incoming digest into the receive buffer
1356 * here, together with its struct p_data?
1358 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1365 if (!expect(data_size != 0))
1367 if (!expect(IS_ALIGNED(data_size, 512)))
1369 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1372 /* even though we trust out peer,
1373 * we sometimes have to double check. */
1374 if (sector + (data_size>>9) > capacity) {
1375 dev_err(DEV, "request from peer beyond end of local disk: "
1376 "capacity: %llus < sector: %llus + size: %u\n",
1377 (unsigned long long)capacity,
1378 (unsigned long long)sector, data_size);
1382 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1383 * "criss-cross" setup, that might cause write-out on some other DRBD,
1384 * which in turn might block on the other node at this very place. */
1385 peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1390 page = peer_req->pages;
1391 page_chain_for_each(page) {
1392 unsigned len = min_t(int, ds, PAGE_SIZE);
1394 err = drbd_recv_all_warn(mdev->tconn, data, len);
1395 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1396 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1397 data[0] = data[0] ^ (unsigned long)-1;
1401 drbd_free_peer_req(mdev, peer_req);
1408 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1409 if (memcmp(dig_in, dig_vv, dgs)) {
1410 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1411 (unsigned long long)sector, data_size);
1412 drbd_free_peer_req(mdev, peer_req);
1416 mdev->recv_cnt += data_size>>9;
1420 /* drbd_drain_block() just takes a data block
1421 * out of the socket input buffer, and discards it.
1423 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1432 page = drbd_alloc_pages(mdev, 1, 1);
1436 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1438 err = drbd_recv_all_warn(mdev->tconn, data, len);
1444 drbd_free_pages(mdev, page, 0);
1448 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1449 sector_t sector, int data_size)
1451 struct bio_vec *bvec;
1453 int dgs, err, i, expect;
1454 void *dig_in = mdev->tconn->int_dig_in;
1455 void *dig_vv = mdev->tconn->int_dig_vv;
1457 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1458 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1461 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1468 /* optimistically update recv_cnt. if receiving fails below,
1469 * we disconnect anyways, and counters will be reset. */
1470 mdev->recv_cnt += data_size>>9;
1472 bio = req->master_bio;
1473 D_ASSERT(sector == bio->bi_sector);
1475 bio_for_each_segment(bvec, bio, i) {
1476 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1477 expect = min_t(int, data_size, bvec->bv_len);
1478 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1479 kunmap(bvec->bv_page);
1482 data_size -= expect;
1486 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1487 if (memcmp(dig_in, dig_vv, dgs)) {
1488 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1493 D_ASSERT(data_size == 0);
1498 * e_end_resync_block() is called in asender context via
1499 * drbd_finish_peer_reqs().
1501 static int e_end_resync_block(struct drbd_work *w, int unused)
1503 struct drbd_peer_request *peer_req =
1504 container_of(w, struct drbd_peer_request, w);
1505 struct drbd_conf *mdev = w->mdev;
1506 sector_t sector = peer_req->i.sector;
1509 D_ASSERT(drbd_interval_empty(&peer_req->i));
1511 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1512 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1513 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1515 /* Record failure to sync */
1516 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1518 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1525 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1527 struct drbd_peer_request *peer_req;
1529 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1533 dec_rs_pending(mdev);
1536 /* corresponding dec_unacked() in e_end_resync_block()
1537 * respective _drbd_clear_done_ee */
1539 peer_req->w.cb = e_end_resync_block;
1541 spin_lock_irq(&mdev->tconn->req_lock);
1542 list_add(&peer_req->w.list, &mdev->sync_ee);
1543 spin_unlock_irq(&mdev->tconn->req_lock);
1545 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1546 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1549 /* don't care for the reason here */
1550 dev_err(DEV, "submit failed, triggering re-connect\n");
1551 spin_lock_irq(&mdev->tconn->req_lock);
1552 list_del(&peer_req->w.list);
1553 spin_unlock_irq(&mdev->tconn->req_lock);
1555 drbd_free_peer_req(mdev, peer_req);
1561 static struct drbd_request *
1562 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1563 sector_t sector, bool missing_ok, const char *func)
1565 struct drbd_request *req;
1567 /* Request object according to our peer */
1568 req = (struct drbd_request *)(unsigned long)id;
1569 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1572 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1573 (unsigned long)id, (unsigned long long)sector);
1578 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1580 struct drbd_conf *mdev;
1581 struct drbd_request *req;
1584 struct p_data *p = pi->data;
1586 mdev = vnr_to_mdev(tconn, pi->vnr);
1590 sector = be64_to_cpu(p->sector);
1592 spin_lock_irq(&mdev->tconn->req_lock);
1593 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1594 spin_unlock_irq(&mdev->tconn->req_lock);
1598 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1599 * special casing it there for the various failure cases.
1600 * still no race with drbd_fail_pending_reads */
1601 err = recv_dless_read(mdev, req, sector, pi->size);
1603 req_mod(req, DATA_RECEIVED);
1604 /* else: nothing. handled from drbd_disconnect...
1605 * I don't think we may complete this just yet
1606 * in case we are "on-disconnect: freeze" */
1611 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1613 struct drbd_conf *mdev;
1616 struct p_data *p = pi->data;
1618 mdev = vnr_to_mdev(tconn, pi->vnr);
1622 sector = be64_to_cpu(p->sector);
1623 D_ASSERT(p->block_id == ID_SYNCER);
1625 if (get_ldev(mdev)) {
1626 /* data is submitted to disk within recv_resync_read.
1627 * corresponding put_ldev done below on error,
1628 * or in drbd_peer_request_endio. */
1629 err = recv_resync_read(mdev, sector, pi->size);
1631 if (__ratelimit(&drbd_ratelimit_state))
1632 dev_err(DEV, "Can not write resync data to local disk.\n");
1634 err = drbd_drain_block(mdev, pi->size);
1636 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1639 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1644 static int w_restart_write(struct drbd_work *w, int cancel)
1646 struct drbd_request *req = container_of(w, struct drbd_request, w);
1647 struct drbd_conf *mdev = w->mdev;
1649 unsigned long start_time;
1650 unsigned long flags;
1652 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1653 if (!expect(req->rq_state & RQ_POSTPONED)) {
1654 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1657 bio = req->master_bio;
1658 start_time = req->start_time;
1659 /* Postponed requests will not have their master_bio completed! */
1660 __req_mod(req, DISCARD_WRITE, NULL);
1661 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1663 while (__drbd_make_request(mdev, bio, start_time))
1668 static void restart_conflicting_writes(struct drbd_conf *mdev,
1669 sector_t sector, int size)
1671 struct drbd_interval *i;
1672 struct drbd_request *req;
1674 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1677 req = container_of(i, struct drbd_request, i);
1678 if (req->rq_state & RQ_LOCAL_PENDING ||
1679 !(req->rq_state & RQ_POSTPONED))
1681 if (expect(list_empty(&req->w.list))) {
1683 req->w.cb = w_restart_write;
1684 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1690 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1692 static int e_end_block(struct drbd_work *w, int cancel)
1694 struct drbd_peer_request *peer_req =
1695 container_of(w, struct drbd_peer_request, w);
1696 struct drbd_conf *mdev = w->mdev;
1697 sector_t sector = peer_req->i.sector;
1700 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1701 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1702 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1703 mdev->state.conn <= C_PAUSED_SYNC_T &&
1704 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1705 P_RS_WRITE_ACK : P_WRITE_ACK;
1706 err = drbd_send_ack(mdev, pcmd, peer_req);
1707 if (pcmd == P_RS_WRITE_ACK)
1708 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1710 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1711 /* we expect it to be marked out of sync anyways...
1712 * maybe assert this? */
1716 /* we delete from the conflict detection hash _after_ we sent out the
1717 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1718 if (mdev->tconn->net_conf->two_primaries) {
1719 spin_lock_irq(&mdev->tconn->req_lock);
1720 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1721 drbd_remove_epoch_entry_interval(mdev, peer_req);
1722 if (peer_req->flags & EE_RESTART_REQUESTS)
1723 restart_conflicting_writes(mdev, sector, peer_req->i.size);
1724 spin_unlock_irq(&mdev->tconn->req_lock);
1726 D_ASSERT(drbd_interval_empty(&peer_req->i));
1728 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1733 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1735 struct drbd_conf *mdev = w->mdev;
1736 struct drbd_peer_request *peer_req =
1737 container_of(w, struct drbd_peer_request, w);
1740 err = drbd_send_ack(mdev, ack, peer_req);
1746 static int e_send_discard_write(struct drbd_work *w, int unused)
1748 return e_send_ack(w, P_DISCARD_WRITE);
1751 static int e_send_retry_write(struct drbd_work *w, int unused)
1753 struct drbd_tconn *tconn = w->mdev->tconn;
1755 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1756 P_RETRY_WRITE : P_DISCARD_WRITE);
1759 static bool seq_greater(u32 a, u32 b)
1762 * We assume 32-bit wrap-around here.
1763 * For 24-bit wrap-around, we would have to shift:
1766 return (s32)a - (s32)b > 0;
1769 static u32 seq_max(u32 a, u32 b)
1771 return seq_greater(a, b) ? a : b;
1774 static bool need_peer_seq(struct drbd_conf *mdev)
1776 struct drbd_tconn *tconn = mdev->tconn;
1779 * We only need to keep track of the last packet_seq number of our peer
1780 * if we are in dual-primary mode and we have the discard flag set; see
1781 * handle_write_conflicts().
1783 return tconn->net_conf->two_primaries &&
1784 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1787 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1789 unsigned int newest_peer_seq;
1791 if (need_peer_seq(mdev)) {
1792 spin_lock(&mdev->peer_seq_lock);
1793 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1794 mdev->peer_seq = newest_peer_seq;
1795 spin_unlock(&mdev->peer_seq_lock);
1796 /* wake up only if we actually changed mdev->peer_seq */
1797 if (peer_seq == newest_peer_seq)
1798 wake_up(&mdev->seq_wait);
1802 /* Called from receive_Data.
1803 * Synchronize packets on sock with packets on msock.
1805 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1806 * packet traveling on msock, they are still processed in the order they have
1809 * Note: we don't care for Ack packets overtaking P_DATA packets.
1811 * In case packet_seq is larger than mdev->peer_seq number, there are
1812 * outstanding packets on the msock. We wait for them to arrive.
1813 * In case we are the logically next packet, we update mdev->peer_seq
1814 * ourselves. Correctly handles 32bit wrap around.
1816 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1817 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1818 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1819 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1821 * returns 0 if we may process the packet,
1822 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1823 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1829 if (!need_peer_seq(mdev))
1832 spin_lock(&mdev->peer_seq_lock);
1834 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1835 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1839 if (signal_pending(current)) {
1843 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1844 spin_unlock(&mdev->peer_seq_lock);
1845 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1846 timeout = schedule_timeout(timeout);
1847 spin_lock(&mdev->peer_seq_lock);
1850 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1854 spin_unlock(&mdev->peer_seq_lock);
1855 finish_wait(&mdev->seq_wait, &wait);
1859 /* see also bio_flags_to_wire()
1860 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1861 * flags and back. We may replicate to other kernel versions. */
1862 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1864 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1865 (dpf & DP_FUA ? REQ_FUA : 0) |
1866 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1867 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1870 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1873 struct drbd_interval *i;
1876 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1877 struct drbd_request *req;
1878 struct bio_and_error m;
1882 req = container_of(i, struct drbd_request, i);
1883 if (!(req->rq_state & RQ_POSTPONED))
1885 req->rq_state &= ~RQ_POSTPONED;
1886 __req_mod(req, NEG_ACKED, &m);
1887 spin_unlock_irq(&mdev->tconn->req_lock);
1889 complete_master_bio(mdev, &m);
1890 spin_lock_irq(&mdev->tconn->req_lock);
1895 static int handle_write_conflicts(struct drbd_conf *mdev,
1896 struct drbd_peer_request *peer_req)
1898 struct drbd_tconn *tconn = mdev->tconn;
1899 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1900 sector_t sector = peer_req->i.sector;
1901 const unsigned int size = peer_req->i.size;
1902 struct drbd_interval *i;
1907 * Inserting the peer request into the write_requests tree will prevent
1908 * new conflicting local requests from being added.
1910 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1913 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1914 if (i == &peer_req->i)
1919 * Our peer has sent a conflicting remote request; this
1920 * should not happen in a two-node setup. Wait for the
1921 * earlier peer request to complete.
1923 err = drbd_wait_misc(mdev, i);
1929 equal = i->sector == sector && i->size == size;
1930 if (resolve_conflicts) {
1932 * If the peer request is fully contained within the
1933 * overlapping request, it can be discarded; otherwise,
1934 * it will be retried once all overlapping requests
1937 bool discard = i->sector <= sector && i->sector +
1938 (i->size >> 9) >= sector + (size >> 9);
1941 dev_alert(DEV, "Concurrent writes detected: "
1942 "local=%llus +%u, remote=%llus +%u, "
1943 "assuming %s came first\n",
1944 (unsigned long long)i->sector, i->size,
1945 (unsigned long long)sector, size,
1946 discard ? "local" : "remote");
1949 peer_req->w.cb = discard ? e_send_discard_write :
1951 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1952 wake_asender(mdev->tconn);
1957 struct drbd_request *req =
1958 container_of(i, struct drbd_request, i);
1961 dev_alert(DEV, "Concurrent writes detected: "
1962 "local=%llus +%u, remote=%llus +%u\n",
1963 (unsigned long long)i->sector, i->size,
1964 (unsigned long long)sector, size);
1966 if (req->rq_state & RQ_LOCAL_PENDING ||
1967 !(req->rq_state & RQ_POSTPONED)) {
1969 * Wait for the node with the discard flag to
1970 * decide if this request will be discarded or
1971 * retried. Requests that are discarded will
1972 * disappear from the write_requests tree.
1974 * In addition, wait for the conflicting
1975 * request to finish locally before submitting
1976 * the conflicting peer request.
1978 err = drbd_wait_misc(mdev, &req->i);
1980 _conn_request_state(mdev->tconn,
1981 NS(conn, C_TIMEOUT),
1983 fail_postponed_requests(mdev, sector, size);
1989 * Remember to restart the conflicting requests after
1990 * the new peer request has completed.
1992 peer_req->flags |= EE_RESTART_REQUESTS;
1999 drbd_remove_epoch_entry_interval(mdev, peer_req);
2003 /* mirrored write */
2004 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2006 struct drbd_conf *mdev;
2008 struct drbd_peer_request *peer_req;
2009 struct p_data *p = pi->data;
2010 u32 peer_seq = be32_to_cpu(p->seq_num);
2015 mdev = vnr_to_mdev(tconn, pi->vnr);
2019 if (!get_ldev(mdev)) {
2022 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2023 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2024 atomic_inc(&mdev->current_epoch->epoch_size);
2025 err2 = drbd_drain_block(mdev, pi->size);
2032 * Corresponding put_ldev done either below (on various errors), or in
2033 * drbd_peer_request_endio, if we successfully submit the data at the
2034 * end of this function.
2037 sector = be64_to_cpu(p->sector);
2038 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2044 peer_req->w.cb = e_end_block;
2046 dp_flags = be32_to_cpu(p->dp_flags);
2047 rw |= wire_flags_to_bio(mdev, dp_flags);
2049 if (dp_flags & DP_MAY_SET_IN_SYNC)
2050 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2052 spin_lock(&mdev->epoch_lock);
2053 peer_req->epoch = mdev->current_epoch;
2054 atomic_inc(&peer_req->epoch->epoch_size);
2055 atomic_inc(&peer_req->epoch->active);
2056 spin_unlock(&mdev->epoch_lock);
2058 if (mdev->tconn->net_conf->two_primaries) {
2059 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2061 goto out_interrupted;
2062 spin_lock_irq(&mdev->tconn->req_lock);
2063 err = handle_write_conflicts(mdev, peer_req);
2065 spin_unlock_irq(&mdev->tconn->req_lock);
2066 if (err == -ENOENT) {
2070 goto out_interrupted;
2073 spin_lock_irq(&mdev->tconn->req_lock);
2074 list_add(&peer_req->w.list, &mdev->active_ee);
2075 spin_unlock_irq(&mdev->tconn->req_lock);
2077 switch (mdev->tconn->net_conf->wire_protocol) {
2080 /* corresponding dec_unacked() in e_end_block()
2081 * respective _drbd_clear_done_ee */
2084 /* I really don't like it that the receiver thread
2085 * sends on the msock, but anyways */
2086 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2093 if (mdev->state.pdsk < D_INCONSISTENT) {
2094 /* In case we have the only disk of the cluster, */
2095 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2096 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2097 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2098 drbd_al_begin_io(mdev, &peer_req->i);
2101 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2105 /* don't care for the reason here */
2106 dev_err(DEV, "submit failed, triggering re-connect\n");
2107 spin_lock_irq(&mdev->tconn->req_lock);
2108 list_del(&peer_req->w.list);
2109 drbd_remove_epoch_entry_interval(mdev, peer_req);
2110 spin_unlock_irq(&mdev->tconn->req_lock);
2111 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2112 drbd_al_complete_io(mdev, &peer_req->i);
2115 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2117 drbd_free_peer_req(mdev, peer_req);
2121 /* We may throttle resync, if the lower device seems to be busy,
2122 * and current sync rate is above c_min_rate.
2124 * To decide whether or not the lower device is busy, we use a scheme similar
2125 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2126 * (more than 64 sectors) of activity we cannot account for with our own resync
2127 * activity, it obviously is "busy".
2129 * The current sync rate used here uses only the most recent two step marks,
2130 * to have a short time average so we can react faster.
2132 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2134 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2135 unsigned long db, dt, dbdt;
2136 struct lc_element *tmp;
2140 /* feature disabled? */
2141 if (mdev->ldev->dc.c_min_rate == 0)
2144 spin_lock_irq(&mdev->al_lock);
2145 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2147 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2148 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2149 spin_unlock_irq(&mdev->al_lock);
2152 /* Do not slow down if app IO is already waiting for this extent */
2154 spin_unlock_irq(&mdev->al_lock);
2156 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2157 (int)part_stat_read(&disk->part0, sectors[1]) -
2158 atomic_read(&mdev->rs_sect_ev);
2160 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2161 unsigned long rs_left;
2164 mdev->rs_last_events = curr_events;
2166 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2168 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2170 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2171 rs_left = mdev->ov_left;
2173 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2175 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2178 db = mdev->rs_mark_left[i] - rs_left;
2179 dbdt = Bit2KB(db/dt);
2181 if (dbdt > mdev->ldev->dc.c_min_rate)
2188 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2190 struct drbd_conf *mdev;
2193 struct drbd_peer_request *peer_req;
2194 struct digest_info *di = NULL;
2196 unsigned int fault_type;
2197 struct p_block_req *p = pi->data;
2199 mdev = vnr_to_mdev(tconn, pi->vnr);
2202 capacity = drbd_get_capacity(mdev->this_bdev);
2204 sector = be64_to_cpu(p->sector);
2205 size = be32_to_cpu(p->blksize);
2207 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2208 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2209 (unsigned long long)sector, size);
2212 if (sector + (size>>9) > capacity) {
2213 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2214 (unsigned long long)sector, size);
2218 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2221 case P_DATA_REQUEST:
2222 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2224 case P_RS_DATA_REQUEST:
2225 case P_CSUM_RS_REQUEST:
2227 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2231 dec_rs_pending(mdev);
2232 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2237 if (verb && __ratelimit(&drbd_ratelimit_state))
2238 dev_err(DEV, "Can not satisfy peer's read request, "
2239 "no local data.\n");
2241 /* drain possibly payload */
2242 return drbd_drain_block(mdev, pi->size);
2245 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2246 * "criss-cross" setup, that might cause write-out on some other DRBD,
2247 * which in turn might block on the other node at this very place. */
2248 peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2255 case P_DATA_REQUEST:
2256 peer_req->w.cb = w_e_end_data_req;
2257 fault_type = DRBD_FAULT_DT_RD;
2258 /* application IO, don't drbd_rs_begin_io */
2261 case P_RS_DATA_REQUEST:
2262 peer_req->w.cb = w_e_end_rsdata_req;
2263 fault_type = DRBD_FAULT_RS_RD;
2264 /* used in the sector offset progress display */
2265 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2269 case P_CSUM_RS_REQUEST:
2270 fault_type = DRBD_FAULT_RS_RD;
2271 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2275 di->digest_size = pi->size;
2276 di->digest = (((char *)di)+sizeof(struct digest_info));
2278 peer_req->digest = di;
2279 peer_req->flags |= EE_HAS_DIGEST;
2281 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2284 if (pi->cmd == P_CSUM_RS_REQUEST) {
2285 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2286 peer_req->w.cb = w_e_end_csum_rs_req;
2287 /* used in the sector offset progress display */
2288 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2289 } else if (pi->cmd == P_OV_REPLY) {
2290 /* track progress, we may need to throttle */
2291 atomic_add(size >> 9, &mdev->rs_sect_in);
2292 peer_req->w.cb = w_e_end_ov_reply;
2293 dec_rs_pending(mdev);
2294 /* drbd_rs_begin_io done when we sent this request,
2295 * but accounting still needs to be done. */
2296 goto submit_for_resync;
2301 if (mdev->ov_start_sector == ~(sector_t)0 &&
2302 mdev->tconn->agreed_pro_version >= 90) {
2303 unsigned long now = jiffies;
2305 mdev->ov_start_sector = sector;
2306 mdev->ov_position = sector;
2307 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2308 mdev->rs_total = mdev->ov_left;
2309 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2310 mdev->rs_mark_left[i] = mdev->ov_left;
2311 mdev->rs_mark_time[i] = now;
2313 dev_info(DEV, "Online Verify start sector: %llu\n",
2314 (unsigned long long)sector);
2316 peer_req->w.cb = w_e_end_ov_req;
2317 fault_type = DRBD_FAULT_RS_RD;
2324 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2325 * wrt the receiver, but it is not as straightforward as it may seem.
2326 * Various places in the resync start and stop logic assume resync
2327 * requests are processed in order, requeuing this on the worker thread
2328 * introduces a bunch of new code for synchronization between threads.
2330 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2331 * "forever", throttling after drbd_rs_begin_io will lock that extent
2332 * for application writes for the same time. For now, just throttle
2333 * here, where the rest of the code expects the receiver to sleep for
2337 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2338 * this defers syncer requests for some time, before letting at least
2339 * on request through. The resync controller on the receiving side
2340 * will adapt to the incoming rate accordingly.
2342 * We cannot throttle here if remote is Primary/SyncTarget:
2343 * we would also throttle its application reads.
2344 * In that case, throttling is done on the SyncTarget only.
2346 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2347 schedule_timeout_uninterruptible(HZ/10);
2348 if (drbd_rs_begin_io(mdev, sector))
2352 atomic_add(size >> 9, &mdev->rs_sect_ev);
2356 spin_lock_irq(&mdev->tconn->req_lock);
2357 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2358 spin_unlock_irq(&mdev->tconn->req_lock);
2360 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2363 /* don't care for the reason here */
2364 dev_err(DEV, "submit failed, triggering re-connect\n");
2365 spin_lock_irq(&mdev->tconn->req_lock);
2366 list_del(&peer_req->w.list);
2367 spin_unlock_irq(&mdev->tconn->req_lock);
2368 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2372 drbd_free_peer_req(mdev, peer_req);
2376 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2378 int self, peer, rv = -100;
2379 unsigned long ch_self, ch_peer;
2381 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2382 peer = mdev->p_uuid[UI_BITMAP] & 1;
2384 ch_peer = mdev->p_uuid[UI_SIZE];
2385 ch_self = mdev->comm_bm_set;
2387 switch (mdev->tconn->net_conf->after_sb_0p) {
2389 case ASB_DISCARD_SECONDARY:
2390 case ASB_CALL_HELPER:
2391 dev_err(DEV, "Configuration error.\n");
2393 case ASB_DISCONNECT:
2395 case ASB_DISCARD_YOUNGER_PRI:
2396 if (self == 0 && peer == 1) {
2400 if (self == 1 && peer == 0) {
2404 /* Else fall through to one of the other strategies... */
2405 case ASB_DISCARD_OLDER_PRI:
2406 if (self == 0 && peer == 1) {
2410 if (self == 1 && peer == 0) {
2414 /* Else fall through to one of the other strategies... */
2415 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2416 "Using discard-least-changes instead\n");
2417 case ASB_DISCARD_ZERO_CHG:
2418 if (ch_peer == 0 && ch_self == 0) {
2419 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2423 if (ch_peer == 0) { rv = 1; break; }
2424 if (ch_self == 0) { rv = -1; break; }
2426 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2428 case ASB_DISCARD_LEAST_CHG:
2429 if (ch_self < ch_peer)
2431 else if (ch_self > ch_peer)
2433 else /* ( ch_self == ch_peer ) */
2434 /* Well, then use something else. */
2435 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2438 case ASB_DISCARD_LOCAL:
2441 case ASB_DISCARD_REMOTE:
2448 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2452 switch (mdev->tconn->net_conf->after_sb_1p) {
2453 case ASB_DISCARD_YOUNGER_PRI:
2454 case ASB_DISCARD_OLDER_PRI:
2455 case ASB_DISCARD_LEAST_CHG:
2456 case ASB_DISCARD_LOCAL:
2457 case ASB_DISCARD_REMOTE:
2458 dev_err(DEV, "Configuration error.\n");
2460 case ASB_DISCONNECT:
2463 hg = drbd_asb_recover_0p(mdev);
2464 if (hg == -1 && mdev->state.role == R_SECONDARY)
2466 if (hg == 1 && mdev->state.role == R_PRIMARY)
2470 rv = drbd_asb_recover_0p(mdev);
2472 case ASB_DISCARD_SECONDARY:
2473 return mdev->state.role == R_PRIMARY ? 1 : -1;
2474 case ASB_CALL_HELPER:
2475 hg = drbd_asb_recover_0p(mdev);
2476 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2477 enum drbd_state_rv rv2;
2479 drbd_set_role(mdev, R_SECONDARY, 0);
2480 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2481 * we might be here in C_WF_REPORT_PARAMS which is transient.
2482 * we do not need to wait for the after state change work either. */
2483 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2484 if (rv2 != SS_SUCCESS) {
2485 drbd_khelper(mdev, "pri-lost-after-sb");
2487 dev_warn(DEV, "Successfully gave up primary role.\n");
2497 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2501 switch (mdev->tconn->net_conf->after_sb_2p) {
2502 case ASB_DISCARD_YOUNGER_PRI:
2503 case ASB_DISCARD_OLDER_PRI:
2504 case ASB_DISCARD_LEAST_CHG:
2505 case ASB_DISCARD_LOCAL:
2506 case ASB_DISCARD_REMOTE:
2508 case ASB_DISCARD_SECONDARY:
2509 dev_err(DEV, "Configuration error.\n");
2512 rv = drbd_asb_recover_0p(mdev);
2514 case ASB_DISCONNECT:
2516 case ASB_CALL_HELPER:
2517 hg = drbd_asb_recover_0p(mdev);
2519 enum drbd_state_rv rv2;
2521 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2522 * we might be here in C_WF_REPORT_PARAMS which is transient.
2523 * we do not need to wait for the after state change work either. */
2524 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2525 if (rv2 != SS_SUCCESS) {
2526 drbd_khelper(mdev, "pri-lost-after-sb");
2528 dev_warn(DEV, "Successfully gave up primary role.\n");
2538 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2539 u64 bits, u64 flags)
2542 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2545 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2547 (unsigned long long)uuid[UI_CURRENT],
2548 (unsigned long long)uuid[UI_BITMAP],
2549 (unsigned long long)uuid[UI_HISTORY_START],
2550 (unsigned long long)uuid[UI_HISTORY_END],
2551 (unsigned long long)bits,
2552 (unsigned long long)flags);
2556 100 after split brain try auto recover
2557 2 C_SYNC_SOURCE set BitMap
2558 1 C_SYNC_SOURCE use BitMap
2560 -1 C_SYNC_TARGET use BitMap
2561 -2 C_SYNC_TARGET set BitMap
2562 -100 after split brain, disconnect
2563 -1000 unrelated data
2564 -1091 requires proto 91
2565 -1096 requires proto 96
2567 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2572 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2573 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2576 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2580 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2581 peer != UUID_JUST_CREATED)
2585 if (self != UUID_JUST_CREATED &&
2586 (peer == UUID_JUST_CREATED || peer == (u64)0))
2590 int rct, dc; /* roles at crash time */
2592 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2594 if (mdev->tconn->agreed_pro_version < 91)
2597 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2598 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2599 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2600 drbd_uuid_set_bm(mdev, 0UL);
2602 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2603 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2606 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2613 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2615 if (mdev->tconn->agreed_pro_version < 91)
2618 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2619 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2620 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2622 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2623 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2624 mdev->p_uuid[UI_BITMAP] = 0UL;
2626 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2629 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2636 /* Common power [off|failure] */
2637 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2638 (mdev->p_uuid[UI_FLAGS] & 2);
2639 /* lowest bit is set when we were primary,
2640 * next bit (weight 2) is set when peer was primary */
2644 case 0: /* !self_pri && !peer_pri */ return 0;
2645 case 1: /* self_pri && !peer_pri */ return 1;
2646 case 2: /* !self_pri && peer_pri */ return -1;
2647 case 3: /* self_pri && peer_pri */
2648 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2654 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2659 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2661 if (mdev->tconn->agreed_pro_version < 96 ?
2662 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2663 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2664 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2665 /* The last P_SYNC_UUID did not get though. Undo the last start of
2666 resync as sync source modifications of the peer's UUIDs. */
2668 if (mdev->tconn->agreed_pro_version < 91)
2671 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2672 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2674 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2675 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2682 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2683 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2684 peer = mdev->p_uuid[i] & ~((u64)1);
2690 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2691 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2696 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2698 if (mdev->tconn->agreed_pro_version < 96 ?
2699 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2700 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2701 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2702 /* The last P_SYNC_UUID did not get though. Undo the last start of
2703 resync as sync source modifications of our UUIDs. */
2705 if (mdev->tconn->agreed_pro_version < 91)
2708 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2709 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2711 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2712 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2713 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2721 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2722 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2723 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2729 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2730 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2731 if (self == peer && self != ((u64)0))
2735 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2736 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2737 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2738 peer = mdev->p_uuid[j] & ~((u64)1);
2747 /* drbd_sync_handshake() returns the new conn state on success, or
2748 CONN_MASK (-1) on failure.
2750 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2751 enum drbd_disk_state peer_disk) __must_hold(local)
2754 enum drbd_conns rv = C_MASK;
2755 enum drbd_disk_state mydisk;
2757 mydisk = mdev->state.disk;
2758 if (mydisk == D_NEGOTIATING)
2759 mydisk = mdev->new_state_tmp.disk;
2761 dev_info(DEV, "drbd_sync_handshake:\n");
2762 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2763 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2764 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2766 hg = drbd_uuid_compare(mdev, &rule_nr);
2768 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2771 dev_alert(DEV, "Unrelated data, aborting!\n");
2775 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2779 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2780 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2781 int f = (hg == -100) || abs(hg) == 2;
2782 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2785 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2786 hg > 0 ? "source" : "target");
2790 drbd_khelper(mdev, "initial-split-brain");
2792 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2793 int pcount = (mdev->state.role == R_PRIMARY)
2794 + (peer_role == R_PRIMARY);
2795 int forced = (hg == -100);
2799 hg = drbd_asb_recover_0p(mdev);
2802 hg = drbd_asb_recover_1p(mdev);
2805 hg = drbd_asb_recover_2p(mdev);
2808 if (abs(hg) < 100) {
2809 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2810 "automatically solved. Sync from %s node\n",
2811 pcount, (hg < 0) ? "peer" : "this");
2813 dev_warn(DEV, "Doing a full sync, since"
2814 " UUIDs where ambiguous.\n");
2821 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2823 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2827 dev_warn(DEV, "Split-Brain detected, manually solved. "
2828 "Sync from %s node\n",
2829 (hg < 0) ? "peer" : "this");
2833 /* FIXME this log message is not correct if we end up here
2834 * after an attempted attach on a diskless node.
2835 * We just refuse to attach -- well, we drop the "connection"
2836 * to that disk, in a way... */
2837 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2838 drbd_khelper(mdev, "split-brain");
2842 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2843 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2847 if (hg < 0 && /* by intention we do not use mydisk here. */
2848 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2849 switch (mdev->tconn->net_conf->rr_conflict) {
2850 case ASB_CALL_HELPER:
2851 drbd_khelper(mdev, "pri-lost");
2853 case ASB_DISCONNECT:
2854 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2857 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2862 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2864 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2866 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2867 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2868 abs(hg) >= 2 ? "full" : "bit-map based");
2873 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2874 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2875 BM_LOCKED_SET_ALLOWED))
2879 if (hg > 0) { /* become sync source. */
2881 } else if (hg < 0) { /* become sync target */
2885 if (drbd_bm_total_weight(mdev)) {
2886 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2887 drbd_bm_total_weight(mdev));
2894 /* returns 1 if invalid */
2895 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2897 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2898 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2899 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2902 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2903 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2904 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2907 /* everything else is valid if they are equal on both sides. */
2911 /* everything es is invalid. */
2915 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2917 struct p_protocol *p = pi->data;
2918 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2919 int p_want_lose, p_two_primaries, cf;
2920 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2922 p_proto = be32_to_cpu(p->protocol);
2923 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2924 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2925 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2926 p_two_primaries = be32_to_cpu(p->two_primaries);
2927 cf = be32_to_cpu(p->conn_flags);
2928 p_want_lose = cf & CF_WANT_LOSE;
2930 clear_bit(CONN_DRY_RUN, &tconn->flags);
2932 if (cf & CF_DRY_RUN)
2933 set_bit(CONN_DRY_RUN, &tconn->flags);
2935 if (p_proto != tconn->net_conf->wire_protocol) {
2936 conn_err(tconn, "incompatible communication protocols\n");
2940 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2941 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2945 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2946 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2950 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2951 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2955 if (p_want_lose && tconn->net_conf->want_lose) {
2956 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2960 if (p_two_primaries != tconn->net_conf->two_primaries) {
2961 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2965 if (tconn->agreed_pro_version >= 87) {
2966 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2969 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2973 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2974 if (strcmp(p_integrity_alg, my_alg)) {
2975 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2978 conn_info(tconn, "data-integrity-alg: %s\n",
2979 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2985 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2990 * input: alg name, feature name
2991 * return: NULL (alg name was "")
2992 * ERR_PTR(error) if something goes wrong
2993 * or the crypto hash ptr, if it worked out ok. */
2994 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2995 const char *alg, const char *name)
2997 struct crypto_hash *tfm;
3002 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3004 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3005 alg, name, PTR_ERR(tfm));
3008 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3009 crypto_free_hash(tfm);
3010 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3011 return ERR_PTR(-EINVAL);
3016 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3018 void *buffer = tconn->data.rbuf;
3019 int size = pi->size;
3022 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3023 s = drbd_recv(tconn, buffer, s);
3037 * config_unknown_volume - device configuration command for unknown volume
3039 * When a device is added to an existing connection, the node on which the
3040 * device is added first will send configuration commands to its peer but the
3041 * peer will not know about the device yet. It will warn and ignore these
3042 * commands. Once the device is added on the second node, the second node will
3043 * send the same device configuration commands, but in the other direction.
3045 * (We can also end up here if drbd is misconfigured.)
3047 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3049 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3050 pi->vnr, cmdname(pi->cmd));
3051 return ignore_remaining_packet(tconn, pi);
3054 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3056 struct drbd_conf *mdev;
3057 struct p_rs_param_95 *p;
3058 unsigned int header_size, data_size, exp_max_sz;
3059 struct crypto_hash *verify_tfm = NULL;
3060 struct crypto_hash *csums_tfm = NULL;
3061 const int apv = tconn->agreed_pro_version;
3062 int *rs_plan_s = NULL;
3066 mdev = vnr_to_mdev(tconn, pi->vnr);
3068 return config_unknown_volume(tconn, pi);
3070 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3071 : apv == 88 ? sizeof(struct p_rs_param)
3073 : apv <= 94 ? sizeof(struct p_rs_param_89)
3074 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3076 if (pi->size > exp_max_sz) {
3077 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3078 pi->size, exp_max_sz);
3083 header_size = sizeof(struct p_rs_param);
3084 data_size = pi->size - header_size;
3085 } else if (apv <= 94) {
3086 header_size = sizeof(struct p_rs_param_89);
3087 data_size = pi->size - header_size;
3088 D_ASSERT(data_size == 0);
3090 header_size = sizeof(struct p_rs_param_95);
3091 data_size = pi->size - header_size;
3092 D_ASSERT(data_size == 0);
3095 /* initialize verify_alg and csums_alg */
3097 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3099 err = drbd_recv_all(mdev->tconn, p, header_size);
3103 if (get_ldev(mdev)) {
3104 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3110 if (data_size > SHARED_SECRET_MAX) {
3111 dev_err(DEV, "verify-alg too long, "
3112 "peer wants %u, accepting only %u byte\n",
3113 data_size, SHARED_SECRET_MAX);
3117 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3121 /* we expect NUL terminated string */
3122 /* but just in case someone tries to be evil */
3123 D_ASSERT(p->verify_alg[data_size-1] == 0);
3124 p->verify_alg[data_size-1] = 0;
3126 } else /* apv >= 89 */ {
3127 /* we still expect NUL terminated strings */
3128 /* but just in case someone tries to be evil */
3129 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3130 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3131 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3132 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3135 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3136 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3137 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3138 mdev->tconn->net_conf->verify_alg, p->verify_alg);
3141 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3142 p->verify_alg, "verify-alg");
3143 if (IS_ERR(verify_tfm)) {
3149 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3150 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3151 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3152 mdev->tconn->net_conf->csums_alg, p->csums_alg);
3155 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3156 p->csums_alg, "csums-alg");
3157 if (IS_ERR(csums_tfm)) {
3163 if (apv > 94 && get_ldev(mdev)) {
3164 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3165 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3166 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3167 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3168 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3170 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3171 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3172 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3174 dev_err(DEV, "kmalloc of fifo_buffer failed");
3182 spin_lock(&mdev->peer_seq_lock);
3183 /* lock against drbd_nl_syncer_conf() */
3185 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3186 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3187 crypto_free_hash(mdev->tconn->verify_tfm);
3188 mdev->tconn->verify_tfm = verify_tfm;
3189 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3192 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3193 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3194 crypto_free_hash(mdev->tconn->csums_tfm);
3195 mdev->tconn->csums_tfm = csums_tfm;
3196 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3198 if (fifo_size != mdev->rs_plan_s.size) {
3199 kfree(mdev->rs_plan_s.values);
3200 mdev->rs_plan_s.values = rs_plan_s;
3201 mdev->rs_plan_s.size = fifo_size;
3202 mdev->rs_planed = 0;
3204 spin_unlock(&mdev->peer_seq_lock);
3209 /* just for completeness: actually not needed,
3210 * as this is not reached if csums_tfm was ok. */
3211 crypto_free_hash(csums_tfm);
3212 /* but free the verify_tfm again, if csums_tfm did not work out */
3213 crypto_free_hash(verify_tfm);
3214 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3218 /* warn if the arguments differ by more than 12.5% */
3219 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3220 const char *s, sector_t a, sector_t b)
3223 if (a == 0 || b == 0)
3225 d = (a > b) ? (a - b) : (b - a);
3226 if (d > (a>>3) || d > (b>>3))
3227 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3228 (unsigned long long)a, (unsigned long long)b);
3231 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3233 struct drbd_conf *mdev;
3234 struct p_sizes *p = pi->data;
3235 enum determine_dev_size dd = unchanged;
3236 sector_t p_size, p_usize, my_usize;
3237 int ldsc = 0; /* local disk size changed */
3238 enum dds_flags ddsf;
3240 mdev = vnr_to_mdev(tconn, pi->vnr);
3242 return config_unknown_volume(tconn, pi);
3244 p_size = be64_to_cpu(p->d_size);
3245 p_usize = be64_to_cpu(p->u_size);
3247 /* just store the peer's disk size for now.
3248 * we still need to figure out whether we accept that. */
3249 mdev->p_size = p_size;
3251 if (get_ldev(mdev)) {
3252 warn_if_differ_considerably(mdev, "lower level device sizes",
3253 p_size, drbd_get_max_capacity(mdev->ldev));
3254 warn_if_differ_considerably(mdev, "user requested size",
3255 p_usize, mdev->ldev->dc.disk_size);
3257 /* if this is the first connect, or an otherwise expected
3258 * param exchange, choose the minimum */
3259 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3260 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3263 my_usize = mdev->ldev->dc.disk_size;
3265 if (mdev->ldev->dc.disk_size != p_usize) {
3266 mdev->ldev->dc.disk_size = p_usize;
3267 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3268 (unsigned long)mdev->ldev->dc.disk_size);
3271 /* Never shrink a device with usable data during connect.
3272 But allow online shrinking if we are connected. */
3273 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3274 drbd_get_capacity(mdev->this_bdev) &&
3275 mdev->state.disk >= D_OUTDATED &&
3276 mdev->state.conn < C_CONNECTED) {
3277 dev_err(DEV, "The peer's disk size is too small!\n");
3278 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3279 mdev->ldev->dc.disk_size = my_usize;
3286 ddsf = be16_to_cpu(p->dds_flags);
3287 if (get_ldev(mdev)) {
3288 dd = drbd_determine_dev_size(mdev, ddsf);
3290 if (dd == dev_size_error)
3294 /* I am diskless, need to accept the peer's size. */
3295 drbd_set_my_capacity(mdev, p_size);
3298 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3299 drbd_reconsider_max_bio_size(mdev);
3301 if (get_ldev(mdev)) {
3302 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3303 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3310 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3311 if (be64_to_cpu(p->c_size) !=
3312 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3313 /* we have different sizes, probably peer
3314 * needs to know my new size... */
3315 drbd_send_sizes(mdev, 0, ddsf);
3317 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3318 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3319 if (mdev->state.pdsk >= D_INCONSISTENT &&
3320 mdev->state.disk >= D_INCONSISTENT) {
3321 if (ddsf & DDSF_NO_RESYNC)
3322 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3324 resync_after_online_grow(mdev);
3326 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3333 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3335 struct drbd_conf *mdev;
3336 struct p_uuids *p = pi->data;
3338 int i, updated_uuids = 0;
3340 mdev = vnr_to_mdev(tconn, pi->vnr);
3342 return config_unknown_volume(tconn, pi);
3344 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3346 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3347 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3349 kfree(mdev->p_uuid);
3350 mdev->p_uuid = p_uuid;
3352 if (mdev->state.conn < C_CONNECTED &&
3353 mdev->state.disk < D_INCONSISTENT &&
3354 mdev->state.role == R_PRIMARY &&
3355 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3356 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3357 (unsigned long long)mdev->ed_uuid);
3358 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3362 if (get_ldev(mdev)) {
3363 int skip_initial_sync =
3364 mdev->state.conn == C_CONNECTED &&
3365 mdev->tconn->agreed_pro_version >= 90 &&
3366 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3367 (p_uuid[UI_FLAGS] & 8);
3368 if (skip_initial_sync) {
3369 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3370 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3371 "clear_n_write from receive_uuids",
3372 BM_LOCKED_TEST_ALLOWED);
3373 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3374 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3375 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3381 } else if (mdev->state.disk < D_INCONSISTENT &&
3382 mdev->state.role == R_PRIMARY) {
3383 /* I am a diskless primary, the peer just created a new current UUID
3385 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3388 /* Before we test for the disk state, we should wait until an eventually
3389 ongoing cluster wide state change is finished. That is important if
3390 we are primary and are detaching from our disk. We need to see the
3391 new disk state... */
3392 mutex_lock(mdev->state_mutex);
3393 mutex_unlock(mdev->state_mutex);
3394 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3395 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3398 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3404 * convert_state() - Converts the peer's view of the cluster state to our point of view
3405 * @ps: The state as seen by the peer.
3407 static union drbd_state convert_state(union drbd_state ps)
3409 union drbd_state ms;
3411 static enum drbd_conns c_tab[] = {
3412 [C_CONNECTED] = C_CONNECTED,
3414 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3415 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3416 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3417 [C_VERIFY_S] = C_VERIFY_T,
3423 ms.conn = c_tab[ps.conn];
3428 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3433 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3435 struct drbd_conf *mdev;
3436 struct p_req_state *p = pi->data;
3437 union drbd_state mask, val;
3438 enum drbd_state_rv rv;
3440 mdev = vnr_to_mdev(tconn, pi->vnr);
3444 mask.i = be32_to_cpu(p->mask);
3445 val.i = be32_to_cpu(p->val);
3447 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3448 mutex_is_locked(mdev->state_mutex)) {
3449 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3453 mask = convert_state(mask);
3454 val = convert_state(val);
3456 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3457 drbd_send_sr_reply(mdev, rv);
3464 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3466 struct p_req_state *p = pi->data;
3467 union drbd_state mask, val;
3468 enum drbd_state_rv rv;
3470 mask.i = be32_to_cpu(p->mask);
3471 val.i = be32_to_cpu(p->val);
3473 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3474 mutex_is_locked(&tconn->cstate_mutex)) {
3475 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3479 mask = convert_state(mask);
3480 val = convert_state(val);
3482 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3483 conn_send_sr_reply(tconn, rv);
3488 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3490 struct drbd_conf *mdev;
3491 struct p_state *p = pi->data;
3492 union drbd_state os, ns, peer_state;
3493 enum drbd_disk_state real_peer_disk;
3494 enum chg_state_flags cs_flags;
3497 mdev = vnr_to_mdev(tconn, pi->vnr);
3499 return config_unknown_volume(tconn, pi);
3501 peer_state.i = be32_to_cpu(p->state);
3503 real_peer_disk = peer_state.disk;
3504 if (peer_state.disk == D_NEGOTIATING) {
3505 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3506 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3509 spin_lock_irq(&mdev->tconn->req_lock);
3511 os = ns = drbd_read_state(mdev);
3512 spin_unlock_irq(&mdev->tconn->req_lock);
3514 /* peer says his disk is uptodate, while we think it is inconsistent,
3515 * and this happens while we think we have a sync going on. */
3516 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3517 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3518 /* If we are (becoming) SyncSource, but peer is still in sync
3519 * preparation, ignore its uptodate-ness to avoid flapping, it
3520 * will change to inconsistent once the peer reaches active
3522 * It may have changed syncer-paused flags, however, so we
3523 * cannot ignore this completely. */
3524 if (peer_state.conn > C_CONNECTED &&
3525 peer_state.conn < C_SYNC_SOURCE)
3526 real_peer_disk = D_INCONSISTENT;
3528 /* if peer_state changes to connected at the same time,
3529 * it explicitly notifies us that it finished resync.
3530 * Maybe we should finish it up, too? */
3531 else if (os.conn >= C_SYNC_SOURCE &&
3532 peer_state.conn == C_CONNECTED) {
3533 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3534 drbd_resync_finished(mdev);
3539 /* peer says his disk is inconsistent, while we think it is uptodate,
3540 * and this happens while the peer still thinks we have a sync going on,
3541 * but we think we are already done with the sync.
3542 * We ignore this to avoid flapping pdsk.
3543 * This should not happen, if the peer is a recent version of drbd. */
3544 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3545 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3546 real_peer_disk = D_UP_TO_DATE;
3548 if (ns.conn == C_WF_REPORT_PARAMS)
3549 ns.conn = C_CONNECTED;
3551 if (peer_state.conn == C_AHEAD)
3554 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3555 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3556 int cr; /* consider resync */
3558 /* if we established a new connection */
3559 cr = (os.conn < C_CONNECTED);
3560 /* if we had an established connection
3561 * and one of the nodes newly attaches a disk */
3562 cr |= (os.conn == C_CONNECTED &&
3563 (peer_state.disk == D_NEGOTIATING ||
3564 os.disk == D_NEGOTIATING));
3565 /* if we have both been inconsistent, and the peer has been
3566 * forced to be UpToDate with --overwrite-data */
3567 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3568 /* if we had been plain connected, and the admin requested to
3569 * start a sync by "invalidate" or "invalidate-remote" */
3570 cr |= (os.conn == C_CONNECTED &&
3571 (peer_state.conn >= C_STARTING_SYNC_S &&
3572 peer_state.conn <= C_WF_BITMAP_T));
3575 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3578 if (ns.conn == C_MASK) {
3579 ns.conn = C_CONNECTED;
3580 if (mdev->state.disk == D_NEGOTIATING) {
3581 drbd_force_state(mdev, NS(disk, D_FAILED));
3582 } else if (peer_state.disk == D_NEGOTIATING) {
3583 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3584 peer_state.disk = D_DISKLESS;
3585 real_peer_disk = D_DISKLESS;
3587 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3589 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3590 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3596 spin_lock_irq(&mdev->tconn->req_lock);
3597 if (os.i != drbd_read_state(mdev).i)
3599 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3600 ns.peer = peer_state.role;
3601 ns.pdsk = real_peer_disk;
3602 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3603 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3604 ns.disk = mdev->new_state_tmp.disk;
3605 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3606 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3607 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3608 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3609 for temporal network outages! */
3610 spin_unlock_irq(&mdev->tconn->req_lock);
3611 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3612 tl_clear(mdev->tconn);
3613 drbd_uuid_new_current(mdev);
3614 clear_bit(NEW_CUR_UUID, &mdev->flags);
3615 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3618 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3619 ns = drbd_read_state(mdev);
3620 spin_unlock_irq(&mdev->tconn->req_lock);
3622 if (rv < SS_SUCCESS) {
3623 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3627 if (os.conn > C_WF_REPORT_PARAMS) {
3628 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3629 peer_state.disk != D_NEGOTIATING ) {
3630 /* we want resync, peer has not yet decided to sync... */
3631 /* Nowadays only used when forcing a node into primary role and
3632 setting its disk to UpToDate with that */
3633 drbd_send_uuids(mdev);
3634 drbd_send_state(mdev);
3638 mdev->tconn->net_conf->want_lose = 0;
3640 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3645 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3647 struct drbd_conf *mdev;
3648 struct p_rs_uuid *p = pi->data;
3650 mdev = vnr_to_mdev(tconn, pi->vnr);
3654 wait_event(mdev->misc_wait,
3655 mdev->state.conn == C_WF_SYNC_UUID ||
3656 mdev->state.conn == C_BEHIND ||
3657 mdev->state.conn < C_CONNECTED ||
3658 mdev->state.disk < D_NEGOTIATING);
3660 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3662 /* Here the _drbd_uuid_ functions are right, current should
3663 _not_ be rotated into the history */
3664 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3665 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3666 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3668 drbd_print_uuids(mdev, "updated sync uuid");
3669 drbd_start_resync(mdev, C_SYNC_TARGET);
3673 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3679 * receive_bitmap_plain
3681 * Return 0 when done, 1 when another iteration is needed, and a negative error
3682 * code upon failure.
3685 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3686 unsigned long *p, struct bm_xfer_ctx *c)
3688 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3689 drbd_header_size(mdev->tconn);
3690 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3691 c->bm_words - c->word_offset);
3692 unsigned int want = num_words * sizeof(*p);
3696 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3701 err = drbd_recv_all(mdev->tconn, p, want);
3705 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3707 c->word_offset += num_words;
3708 c->bit_offset = c->word_offset * BITS_PER_LONG;
3709 if (c->bit_offset > c->bm_bits)
3710 c->bit_offset = c->bm_bits;
3715 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3717 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3720 static int dcbp_get_start(struct p_compressed_bm *p)
3722 return (p->encoding & 0x80) != 0;
3725 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3727 return (p->encoding >> 4) & 0x7;
3733 * Return 0 when done, 1 when another iteration is needed, and a negative error
3734 * code upon failure.
3737 recv_bm_rle_bits(struct drbd_conf *mdev,
3738 struct p_compressed_bm *p,
3739 struct bm_xfer_ctx *c,
3742 struct bitstream bs;
3746 unsigned long s = c->bit_offset;
3748 int toggle = dcbp_get_start(p);
3752 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3754 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3758 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3759 bits = vli_decode_bits(&rl, look_ahead);
3765 if (e >= c->bm_bits) {
3766 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3769 _drbd_bm_set_bits(mdev, s, e);
3773 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3774 have, bits, look_ahead,
3775 (unsigned int)(bs.cur.b - p->code),
3776 (unsigned int)bs.buf_len);
3779 look_ahead >>= bits;
3782 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3785 look_ahead |= tmp << have;
3790 bm_xfer_ctx_bit_to_word_offset(c);
3792 return (s != c->bm_bits);
3798 * Return 0 when done, 1 when another iteration is needed, and a negative error
3799 * code upon failure.
3802 decode_bitmap_c(struct drbd_conf *mdev,
3803 struct p_compressed_bm *p,
3804 struct bm_xfer_ctx *c,
3807 if (dcbp_get_code(p) == RLE_VLI_Bits)
3808 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3810 /* other variants had been implemented for evaluation,
3811 * but have been dropped as this one turned out to be "best"
3812 * during all our tests. */
3814 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3815 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3819 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3820 const char *direction, struct bm_xfer_ctx *c)
3822 /* what would it take to transfer it "plaintext" */
3823 unsigned int header_size = drbd_header_size(mdev->tconn);
3824 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3825 unsigned int plain =
3826 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3827 c->bm_words * sizeof(unsigned long);
3828 unsigned int total = c->bytes[0] + c->bytes[1];
3831 /* total can not be zero. but just in case: */
3835 /* don't report if not compressed */
3839 /* total < plain. check for overflow, still */
3840 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3841 : (1000 * total / plain);
3847 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3848 "total %u; compression: %u.%u%%\n",
3850 c->bytes[1], c->packets[1],
3851 c->bytes[0], c->packets[0],
3852 total, r/10, r % 10);
3855 /* Since we are processing the bitfield from lower addresses to higher,
3856 it does not matter if the process it in 32 bit chunks or 64 bit
3857 chunks as long as it is little endian. (Understand it as byte stream,
3858 beginning with the lowest byte...) If we would use big endian
3859 we would need to process it from the highest address to the lowest,
3860 in order to be agnostic to the 32 vs 64 bits issue.
3862 returns 0 on failure, 1 if we successfully received it. */
3863 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3865 struct drbd_conf *mdev;
3866 struct bm_xfer_ctx c;
3869 mdev = vnr_to_mdev(tconn, pi->vnr);
3873 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3874 /* you are supposed to send additional out-of-sync information
3875 * if you actually set bits during this phase */
3877 c = (struct bm_xfer_ctx) {
3878 .bm_bits = drbd_bm_bits(mdev),
3879 .bm_words = drbd_bm_words(mdev),
3883 if (pi->cmd == P_BITMAP)
3884 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3885 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3886 /* MAYBE: sanity check that we speak proto >= 90,
3887 * and the feature is enabled! */
3888 struct p_compressed_bm *p = pi->data;
3890 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3891 dev_err(DEV, "ReportCBitmap packet too large\n");
3895 if (pi->size <= sizeof(*p)) {
3896 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3900 err = drbd_recv_all(mdev->tconn, p, pi->size);
3903 err = decode_bitmap_c(mdev, p, &c, pi->size);
3905 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3910 c.packets[pi->cmd == P_BITMAP]++;
3911 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3918 err = drbd_recv_header(mdev->tconn, pi);
3923 INFO_bm_xfer_stats(mdev, "receive", &c);
3925 if (mdev->state.conn == C_WF_BITMAP_T) {
3926 enum drbd_state_rv rv;
3928 err = drbd_send_bitmap(mdev);
3931 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3932 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3933 D_ASSERT(rv == SS_SUCCESS);
3934 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3935 /* admin may have requested C_DISCONNECTING,
3936 * other threads may have noticed network errors */
3937 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3938 drbd_conn_str(mdev->state.conn));
3943 drbd_bm_unlock(mdev);
3944 if (!err && mdev->state.conn == C_WF_BITMAP_S)
3945 drbd_start_resync(mdev, C_SYNC_SOURCE);
3949 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3951 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3954 return ignore_remaining_packet(tconn, pi);
3957 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3959 /* Make sure we've acked all the TCP data associated
3960 * with the data requests being unplugged */
3961 drbd_tcp_quickack(tconn->data.socket);
3966 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3968 struct drbd_conf *mdev;
3969 struct p_block_desc *p = pi->data;
3971 mdev = vnr_to_mdev(tconn, pi->vnr);
3975 switch (mdev->state.conn) {
3976 case C_WF_SYNC_UUID:
3981 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3982 drbd_conn_str(mdev->state.conn));
3985 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3993 int (*fn)(struct drbd_tconn *, struct packet_info *);
3996 static struct data_cmd drbd_cmd_handler[] = {
3997 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3998 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3999 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4000 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4001 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4002 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4003 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4004 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4005 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4006 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4007 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4008 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4009 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4010 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4011 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4012 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4013 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4014 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4015 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4016 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4017 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4018 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4019 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4022 static void drbdd(struct drbd_tconn *tconn)
4024 struct packet_info pi;
4025 size_t shs; /* sub header size */
4028 while (get_t_state(&tconn->receiver) == RUNNING) {
4029 struct data_cmd *cmd;
4031 drbd_thread_current_set_cpu(&tconn->receiver);
4032 if (drbd_recv_header(tconn, &pi))
4035 cmd = &drbd_cmd_handler[pi.cmd];
4036 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4037 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4041 shs = cmd->pkt_size;
4042 if (pi.size > shs && !cmd->expect_payload) {
4043 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4048 err = drbd_recv_all_warn(tconn, pi.data, shs);
4054 err = cmd->fn(tconn, &pi);
4056 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4057 cmdname(pi.cmd), err, pi.size);
4064 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4067 void conn_flush_workqueue(struct drbd_tconn *tconn)
4069 struct drbd_wq_barrier barr;
4071 barr.w.cb = w_prev_work_done;
4072 barr.w.tconn = tconn;
4073 init_completion(&barr.done);
4074 drbd_queue_work(&tconn->data.work, &barr.w);
4075 wait_for_completion(&barr.done);
4078 static void drbd_disconnect(struct drbd_tconn *tconn)
4081 int rv = SS_UNKNOWN_ERROR;
4083 if (tconn->cstate == C_STANDALONE)
4086 /* asender does not clean up anything. it must not interfere, either */
4087 drbd_thread_stop(&tconn->asender);
4088 drbd_free_sock(tconn);
4090 down_read(&drbd_cfg_rwsem);
4091 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4092 up_read(&drbd_cfg_rwsem);
4093 conn_info(tconn, "Connection closed\n");
4095 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4096 conn_try_outdate_peer_async(tconn);
4098 spin_lock_irq(&tconn->req_lock);
4100 if (oc >= C_UNCONNECTED)
4101 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4103 spin_unlock_irq(&tconn->req_lock);
4105 if (oc == C_DISCONNECTING) {
4106 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4108 crypto_free_hash(tconn->cram_hmac_tfm);
4109 tconn->cram_hmac_tfm = NULL;
4111 kfree(tconn->net_conf);
4112 tconn->net_conf = NULL;
4113 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4117 static int drbd_disconnected(int vnr, void *p, void *data)
4119 struct drbd_conf *mdev = (struct drbd_conf *)p;
4120 enum drbd_fencing_p fp;
4123 /* wait for current activity to cease. */
4124 spin_lock_irq(&mdev->tconn->req_lock);
4125 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4126 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4127 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4128 spin_unlock_irq(&mdev->tconn->req_lock);
4130 /* We do not have data structures that would allow us to
4131 * get the rs_pending_cnt down to 0 again.
4132 * * On C_SYNC_TARGET we do not have any data structures describing
4133 * the pending RSDataRequest's we have sent.
4134 * * On C_SYNC_SOURCE there is no data structure that tracks
4135 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4136 * And no, it is not the sum of the reference counts in the
4137 * resync_LRU. The resync_LRU tracks the whole operation including
4138 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4140 drbd_rs_cancel_all(mdev);
4142 mdev->rs_failed = 0;
4143 atomic_set(&mdev->rs_pending_cnt, 0);
4144 wake_up(&mdev->misc_wait);
4146 del_timer(&mdev->request_timer);
4148 del_timer_sync(&mdev->resync_timer);
4149 resync_timer_fn((unsigned long)mdev);
4151 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4152 * w_make_resync_request etc. which may still be on the worker queue
4153 * to be "canceled" */
4154 drbd_flush_workqueue(mdev);
4156 drbd_finish_peer_reqs(mdev);
4158 kfree(mdev->p_uuid);
4159 mdev->p_uuid = NULL;
4161 if (!drbd_suspended(mdev))
4162 tl_clear(mdev->tconn);
4167 if (get_ldev(mdev)) {
4168 fp = mdev->ldev->dc.fencing;
4172 /* serialize with bitmap writeout triggered by the state change,
4174 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4176 /* tcp_close and release of sendpage pages can be deferred. I don't
4177 * want to use SO_LINGER, because apparently it can be deferred for
4178 * more than 20 seconds (longest time I checked).
4180 * Actually we don't care for exactly when the network stack does its
4181 * put_page(), but release our reference on these pages right here.
4183 i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4185 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4186 i = atomic_read(&mdev->pp_in_use_by_net);
4188 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4189 i = atomic_read(&mdev->pp_in_use);
4191 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4193 D_ASSERT(list_empty(&mdev->read_ee));
4194 D_ASSERT(list_empty(&mdev->active_ee));
4195 D_ASSERT(list_empty(&mdev->sync_ee));
4196 D_ASSERT(list_empty(&mdev->done_ee));
4198 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4199 atomic_set(&mdev->current_epoch->epoch_size, 0);
4200 D_ASSERT(list_empty(&mdev->current_epoch->list));
4206 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4207 * we can agree on is stored in agreed_pro_version.
4209 * feature flags and the reserved array should be enough room for future
4210 * enhancements of the handshake protocol, and possible plugins...
4212 * for now, they are expected to be zero, but ignored.
4214 static int drbd_send_features(struct drbd_tconn *tconn)
4216 struct drbd_socket *sock;
4217 struct p_connection_features *p;
4219 sock = &tconn->data;
4220 p = conn_prepare_command(tconn, sock);
4223 memset(p, 0, sizeof(*p));
4224 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4225 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4226 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4231 * 1 yes, we have a valid connection
4232 * 0 oops, did not work out, please try again
4233 * -1 peer talks different language,
4234 * no point in trying again, please go standalone.
4236 static int drbd_do_features(struct drbd_tconn *tconn)
4238 /* ASSERT current == tconn->receiver ... */
4239 struct p_connection_features *p;
4240 const int expect = sizeof(struct p_connection_features);
4241 struct packet_info pi;
4244 err = drbd_send_features(tconn);
4248 err = drbd_recv_header(tconn, &pi);
4252 if (pi.cmd != P_CONNECTION_FEATURES) {
4253 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4254 cmdname(pi.cmd), pi.cmd);
4258 if (pi.size != expect) {
4259 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4265 err = drbd_recv_all_warn(tconn, p, expect);
4269 p->protocol_min = be32_to_cpu(p->protocol_min);
4270 p->protocol_max = be32_to_cpu(p->protocol_max);
4271 if (p->protocol_max == 0)
4272 p->protocol_max = p->protocol_min;
4274 if (PRO_VERSION_MAX < p->protocol_min ||
4275 PRO_VERSION_MIN > p->protocol_max)
4278 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4280 conn_info(tconn, "Handshake successful: "
4281 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4286 conn_err(tconn, "incompatible DRBD dialects: "
4287 "I support %d-%d, peer supports %d-%d\n",
4288 PRO_VERSION_MIN, PRO_VERSION_MAX,
4289 p->protocol_min, p->protocol_max);
4293 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4294 static int drbd_do_auth(struct drbd_tconn *tconn)
4296 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4297 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4301 #define CHALLENGE_LEN 64
4305 0 - failed, try again (network error),
4306 -1 - auth failed, don't try again.
4309 static int drbd_do_auth(struct drbd_tconn *tconn)
4311 struct drbd_socket *sock;
4312 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4313 struct scatterlist sg;
4314 char *response = NULL;
4315 char *right_response = NULL;
4316 char *peers_ch = NULL;
4317 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4318 unsigned int resp_size;
4319 struct hash_desc desc;
4320 struct packet_info pi;
4323 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4325 desc.tfm = tconn->cram_hmac_tfm;
4328 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4329 (u8 *)tconn->net_conf->shared_secret, key_len);
4331 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4336 get_random_bytes(my_challenge, CHALLENGE_LEN);
4338 sock = &tconn->data;
4339 if (!conn_prepare_command(tconn, sock)) {
4343 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4344 my_challenge, CHALLENGE_LEN);
4348 err = drbd_recv_header(tconn, &pi);
4354 if (pi.cmd != P_AUTH_CHALLENGE) {
4355 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4356 cmdname(pi.cmd), pi.cmd);
4361 if (pi.size > CHALLENGE_LEN * 2) {
4362 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4367 peers_ch = kmalloc(pi.size, GFP_NOIO);
4368 if (peers_ch == NULL) {
4369 conn_err(tconn, "kmalloc of peers_ch failed\n");
4374 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4380 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4381 response = kmalloc(resp_size, GFP_NOIO);
4382 if (response == NULL) {
4383 conn_err(tconn, "kmalloc of response failed\n");
4388 sg_init_table(&sg, 1);
4389 sg_set_buf(&sg, peers_ch, pi.size);
4391 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4393 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4398 if (!conn_prepare_command(tconn, sock)) {
4402 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4403 response, resp_size);
4407 err = drbd_recv_header(tconn, &pi);
4413 if (pi.cmd != P_AUTH_RESPONSE) {
4414 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4415 cmdname(pi.cmd), pi.cmd);
4420 if (pi.size != resp_size) {
4421 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4426 err = drbd_recv_all_warn(tconn, response , resp_size);
4432 right_response = kmalloc(resp_size, GFP_NOIO);
4433 if (right_response == NULL) {
4434 conn_err(tconn, "kmalloc of right_response failed\n");
4439 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4441 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4443 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4448 rv = !memcmp(response, right_response, resp_size);
4451 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4452 resp_size, tconn->net_conf->cram_hmac_alg);
4459 kfree(right_response);
4465 int drbdd_init(struct drbd_thread *thi)
4467 struct drbd_tconn *tconn = thi->tconn;
4470 conn_info(tconn, "receiver (re)started\n");
4473 h = drbd_connect(tconn);
4475 drbd_disconnect(tconn);
4476 schedule_timeout_interruptible(HZ);
4479 conn_warn(tconn, "Discarding network configuration.\n");
4480 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4485 if (get_net_conf(tconn)) {
4487 put_net_conf(tconn);
4491 drbd_disconnect(tconn);
4493 conn_info(tconn, "receiver terminated\n");
4497 /* ********* acknowledge sender ******** */
4499 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4501 struct p_req_state_reply *p = pi->data;
4502 int retcode = be32_to_cpu(p->retcode);
4504 if (retcode >= SS_SUCCESS) {
4505 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4507 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4508 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4509 drbd_set_st_err_str(retcode), retcode);
4511 wake_up(&tconn->ping_wait);
4516 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4518 struct drbd_conf *mdev;
4519 struct p_req_state_reply *p = pi->data;
4520 int retcode = be32_to_cpu(p->retcode);
4522 mdev = vnr_to_mdev(tconn, pi->vnr);
4526 if (retcode >= SS_SUCCESS) {
4527 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4529 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4530 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4531 drbd_set_st_err_str(retcode), retcode);
4533 wake_up(&mdev->state_wait);
4538 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4540 return drbd_send_ping_ack(tconn);
4544 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4546 /* restore idle timeout */
4547 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4548 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4549 wake_up(&tconn->ping_wait);
4554 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4556 struct drbd_conf *mdev;
4557 struct p_block_ack *p = pi->data;
4558 sector_t sector = be64_to_cpu(p->sector);
4559 int blksize = be32_to_cpu(p->blksize);
4561 mdev = vnr_to_mdev(tconn, pi->vnr);
4565 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4567 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4569 if (get_ldev(mdev)) {
4570 drbd_rs_complete_io(mdev, sector);
4571 drbd_set_in_sync(mdev, sector, blksize);
4572 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4573 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4576 dec_rs_pending(mdev);
4577 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4583 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4584 struct rb_root *root, const char *func,
4585 enum drbd_req_event what, bool missing_ok)
4587 struct drbd_request *req;
4588 struct bio_and_error m;
4590 spin_lock_irq(&mdev->tconn->req_lock);
4591 req = find_request(mdev, root, id, sector, missing_ok, func);
4592 if (unlikely(!req)) {
4593 spin_unlock_irq(&mdev->tconn->req_lock);
4596 __req_mod(req, what, &m);
4597 spin_unlock_irq(&mdev->tconn->req_lock);
4600 complete_master_bio(mdev, &m);
4604 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4606 struct drbd_conf *mdev;
4607 struct p_block_ack *p = pi->data;
4608 sector_t sector = be64_to_cpu(p->sector);
4609 int blksize = be32_to_cpu(p->blksize);
4610 enum drbd_req_event what;
4612 mdev = vnr_to_mdev(tconn, pi->vnr);
4616 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4618 if (p->block_id == ID_SYNCER) {
4619 drbd_set_in_sync(mdev, sector, blksize);
4620 dec_rs_pending(mdev);
4624 case P_RS_WRITE_ACK:
4625 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4626 what = WRITE_ACKED_BY_PEER_AND_SIS;
4629 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4630 what = WRITE_ACKED_BY_PEER;
4633 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4634 what = RECV_ACKED_BY_PEER;
4636 case P_DISCARD_WRITE:
4637 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4638 what = DISCARD_WRITE;
4641 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4642 what = POSTPONE_WRITE;
4648 return validate_req_change_req_state(mdev, p->block_id, sector,
4649 &mdev->write_requests, __func__,
4653 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4655 struct drbd_conf *mdev;
4656 struct p_block_ack *p = pi->data;
4657 sector_t sector = be64_to_cpu(p->sector);
4658 int size = be32_to_cpu(p->blksize);
4659 bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4660 tconn->net_conf->wire_protocol == DRBD_PROT_B;
4663 mdev = vnr_to_mdev(tconn, pi->vnr);
4667 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4669 if (p->block_id == ID_SYNCER) {
4670 dec_rs_pending(mdev);
4671 drbd_rs_failed_io(mdev, sector, size);
4675 err = validate_req_change_req_state(mdev, p->block_id, sector,
4676 &mdev->write_requests, __func__,
4677 NEG_ACKED, missing_ok);
4679 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4680 The master bio might already be completed, therefore the
4681 request is no longer in the collision hash. */
4682 /* In Protocol B we might already have got a P_RECV_ACK
4683 but then get a P_NEG_ACK afterwards. */
4686 drbd_set_out_of_sync(mdev, sector, size);
4691 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4693 struct drbd_conf *mdev;
4694 struct p_block_ack *p = pi->data;
4695 sector_t sector = be64_to_cpu(p->sector);
4697 mdev = vnr_to_mdev(tconn, pi->vnr);
4701 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4703 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4704 (unsigned long long)sector, be32_to_cpu(p->blksize));
4706 return validate_req_change_req_state(mdev, p->block_id, sector,
4707 &mdev->read_requests, __func__,
4711 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4713 struct drbd_conf *mdev;
4716 struct p_block_ack *p = pi->data;
4718 mdev = vnr_to_mdev(tconn, pi->vnr);
4722 sector = be64_to_cpu(p->sector);
4723 size = be32_to_cpu(p->blksize);
4725 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4727 dec_rs_pending(mdev);
4729 if (get_ldev_if_state(mdev, D_FAILED)) {
4730 drbd_rs_complete_io(mdev, sector);
4732 case P_NEG_RS_DREPLY:
4733 drbd_rs_failed_io(mdev, sector, size);
4745 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4747 struct drbd_conf *mdev;
4748 struct p_barrier_ack *p = pi->data;
4750 mdev = vnr_to_mdev(tconn, pi->vnr);
4754 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4756 if (mdev->state.conn == C_AHEAD &&
4757 atomic_read(&mdev->ap_in_flight) == 0 &&
4758 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4759 mdev->start_resync_timer.expires = jiffies + HZ;
4760 add_timer(&mdev->start_resync_timer);
4766 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4768 struct drbd_conf *mdev;
4769 struct p_block_ack *p = pi->data;
4770 struct drbd_work *w;
4774 mdev = vnr_to_mdev(tconn, pi->vnr);
4778 sector = be64_to_cpu(p->sector);
4779 size = be32_to_cpu(p->blksize);
4781 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4783 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4784 drbd_ov_out_of_sync_found(mdev, sector, size);
4786 ov_out_of_sync_print(mdev);
4788 if (!get_ldev(mdev))
4791 drbd_rs_complete_io(mdev, sector);
4792 dec_rs_pending(mdev);
4796 /* let's advance progress step marks only for every other megabyte */
4797 if ((mdev->ov_left & 0x200) == 0x200)
4798 drbd_advance_rs_marks(mdev, mdev->ov_left);
4800 if (mdev->ov_left == 0) {
4801 w = kmalloc(sizeof(*w), GFP_NOIO);
4803 w->cb = w_ov_finished;
4805 drbd_queue_work_front(&mdev->tconn->data.work, w);
4807 dev_err(DEV, "kmalloc(w) failed.");
4808 ov_out_of_sync_print(mdev);
4809 drbd_resync_finished(mdev);
4816 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4821 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4823 struct drbd_conf *mdev;
4824 int i, not_empty = 0;
4827 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4828 flush_signals(current);
4829 down_read(&drbd_cfg_rwsem);
4830 idr_for_each_entry(&tconn->volumes, mdev, i) {
4831 if (drbd_finish_peer_reqs(mdev)) {
4832 up_read(&drbd_cfg_rwsem);
4833 return 1; /* error */
4836 up_read(&drbd_cfg_rwsem);
4837 set_bit(SIGNAL_ASENDER, &tconn->flags);
4839 spin_lock_irq(&tconn->req_lock);
4841 idr_for_each_entry(&tconn->volumes, mdev, i) {
4842 not_empty = !list_empty(&mdev->done_ee);
4847 spin_unlock_irq(&tconn->req_lock);
4848 } while (not_empty);
4853 struct asender_cmd {
4855 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4858 static struct asender_cmd asender_tbl[] = {
4859 [P_PING] = { 0, got_Ping },
4860 [P_PING_ACK] = { 0, got_PingAck },
4861 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4862 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4863 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4864 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4865 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4866 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4867 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
4868 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4869 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4870 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4871 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4872 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4873 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
4874 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4875 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4878 int drbd_asender(struct drbd_thread *thi)
4880 struct drbd_tconn *tconn = thi->tconn;
4881 struct asender_cmd *cmd = NULL;
4882 struct packet_info pi;
4884 void *buf = tconn->meta.rbuf;
4886 unsigned int header_size = drbd_header_size(tconn);
4887 int expect = header_size;
4888 int ping_timeout_active = 0;
4890 current->policy = SCHED_RR; /* Make this a realtime task! */
4891 current->rt_priority = 2; /* more important than all other tasks */
4893 while (get_t_state(thi) == RUNNING) {
4894 drbd_thread_current_set_cpu(thi);
4895 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4896 if (drbd_send_ping(tconn)) {
4897 conn_err(tconn, "drbd_send_ping has failed\n");
4900 tconn->meta.socket->sk->sk_rcvtimeo =
4901 tconn->net_conf->ping_timeo*HZ/10;
4902 ping_timeout_active = 1;
4905 /* TODO: conditionally cork; it may hurt latency if we cork without
4907 if (!tconn->net_conf->no_cork)
4908 drbd_tcp_cork(tconn->meta.socket);
4909 if (tconn_finish_peer_reqs(tconn)) {
4910 conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
4913 /* but unconditionally uncork unless disabled */
4914 if (!tconn->net_conf->no_cork)
4915 drbd_tcp_uncork(tconn->meta.socket);
4917 /* short circuit, recv_msg would return EINTR anyways. */
4918 if (signal_pending(current))
4921 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4922 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4924 flush_signals(current);
4927 * -EINTR (on meta) we got a signal
4928 * -EAGAIN (on meta) rcvtimeo expired
4929 * -ECONNRESET other side closed the connection
4930 * -ERESTARTSYS (on data) we got a signal
4931 * rv < 0 other than above: unexpected error!
4932 * rv == expected: full header or command
4933 * rv < expected: "woken" by signal during receive
4934 * rv == 0 : "connection shut down by peer"
4936 if (likely(rv > 0)) {
4939 } else if (rv == 0) {
4940 conn_err(tconn, "meta connection shut down by peer.\n");
4942 } else if (rv == -EAGAIN) {
4943 /* If the data socket received something meanwhile,
4944 * that is good enough: peer is still alive. */
4945 if (time_after(tconn->last_received,
4946 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4948 if (ping_timeout_active) {
4949 conn_err(tconn, "PingAck did not arrive in time.\n");
4952 set_bit(SEND_PING, &tconn->flags);
4954 } else if (rv == -EINTR) {
4957 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4961 if (received == expect && cmd == NULL) {
4962 if (decode_header(tconn, tconn->meta.rbuf, &pi))
4964 cmd = &asender_tbl[pi.cmd];
4965 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4966 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4970 expect = header_size + cmd->pkt_size;
4971 if (pi.size != expect - header_size) {
4972 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4977 if (received == expect) {
4980 err = cmd->fn(tconn, &pi);
4982 conn_err(tconn, "%pf failed\n", cmd->fn);
4986 tconn->last_received = jiffies;
4988 /* the idle_timeout (ping-int)
4989 * has been restored in got_PingAck() */
4990 if (cmd == &asender_tbl[P_PING_ACK])
4991 ping_timeout_active = 0;
4993 buf = tconn->meta.rbuf;
4995 expect = header_size;
5002 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5006 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5008 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5010 conn_info(tconn, "asender terminated\n");