4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
47 #include "drbd_protocol.h"
51 #define PRO_FEATURES (FF_TRIM)
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
81 /* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
85 static struct page *page_chain_del(struct page **head, int n)
99 tmp = page_chain_next(page);
101 break; /* found sufficient pages */
103 /* insufficient pages, don't use any of them. */
108 /* add end of list marker for the returned list */
109 set_page_private(page, 0);
110 /* actual return value, and adjustment of head */
116 /* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
123 while ((tmp = page_chain_next(page)))
130 static int page_chain_free(struct page *page)
134 page_chain_for_each_safe(page, tmp) {
141 static void page_chain_add(struct page **head,
142 struct page *chain_first, struct page *chain_last)
146 tmp = page_chain_tail(chain_first, NULL);
147 BUG_ON(tmp != chain_last);
150 /* add chain to head */
151 set_page_private(chain_last, (unsigned long)*head);
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 struct page *page = NULL;
159 struct page *tmp = NULL;
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
164 if (drbd_pp_vacant >= number) {
165 spin_lock(&drbd_pp_lock);
166 page = page_chain_del(&drbd_pp_pool, number);
168 drbd_pp_vacant -= number;
169 spin_unlock(&drbd_pp_lock);
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
177 for (i = 0; i < number; i++) {
178 tmp = alloc_page(GFP_TRY);
181 set_page_private(tmp, (unsigned long)page);
188 /* Not enough pages immediately available this time.
189 * No need to jump around here, drbd_alloc_pages will retry this
190 * function "soon". */
192 tmp = page_chain_tail(page, NULL);
193 spin_lock(&drbd_pp_lock);
194 page_chain_add(&drbd_pp_pool, page, tmp);
196 spin_unlock(&drbd_pp_lock);
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 struct list_head *to_be_freed)
204 struct drbd_peer_request *peer_req, *tmp;
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 if (drbd_peer_req_has_active_page(peer_req))
214 list_move(&peer_req->w.list, to_be_freed);
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
223 spin_lock_irq(&device->resource->req_lock);
224 reclaim_finished_net_peer_reqs(device, &reclaimed);
225 spin_unlock_irq(&device->resource->req_lock);
227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 drbd_free_net_peer_req(device, peer_req);
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @device: DRBD device.
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
237 * Tries to allocate number pages, first from our own page pool, then from
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
241 * If this allocation would exceed the max_buffers setting, we throttle
242 * allocation (schedule_timeout) to give the system some room to breathe.
244 * We do not use max-buffers as hard limit, because it could lead to
245 * congestion and further to a distributed deadlock during online-verify or
246 * (checksum based) resync, if the max-buffers, socket buffer sizes and
247 * resync-rate settings are mis-configured.
249 * Returns a page chain linked via page->private.
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
254 struct drbd_device *device = peer_device->device;
255 struct page *page = NULL;
261 nc = rcu_dereference(peer_device->connection->net_conf);
262 mxb = nc ? nc->max_buffers : 1000000;
265 if (atomic_read(&device->pp_in_use) < mxb)
266 page = __drbd_alloc_pages(device, number);
268 while (page == NULL) {
269 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271 drbd_kick_lo_and_reclaim_net(device);
273 if (atomic_read(&device->pp_in_use) < mxb) {
274 page = __drbd_alloc_pages(device, number);
282 if (signal_pending(current)) {
283 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
287 if (schedule_timeout(HZ/10) == 0)
290 finish_wait(&drbd_pp_wait, &wait);
293 atomic_add(number, &device->pp_in_use);
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
303 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
309 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310 i = page_chain_free(page);
313 tmp = page_chain_tail(page, &i);
314 spin_lock(&drbd_pp_lock);
315 page_chain_add(&drbd_pp_pool, page, tmp);
317 spin_unlock(&drbd_pp_lock);
319 i = atomic_sub_return(i, a);
321 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323 wake_up(&drbd_pp_wait);
327 You need to hold the req_lock:
328 _drbd_wait_ee_list_empty()
330 You must not have the req_lock:
332 drbd_alloc_peer_req()
333 drbd_free_peer_reqs()
335 drbd_finish_peer_reqs()
337 drbd_wait_ee_list_empty()
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
344 struct drbd_device *device = peer_device->device;
345 struct drbd_peer_request *peer_req;
346 struct page *page = NULL;
347 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
349 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
352 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
354 if (!(gfp_mask & __GFP_NOWARN))
355 drbd_err(device, "%s: allocation failed\n", __func__);
359 if (has_payload && data_size) {
360 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
365 drbd_clear_interval(&peer_req->i);
366 peer_req->i.size = data_size;
367 peer_req->i.sector = sector;
368 peer_req->i.local = false;
369 peer_req->i.waiting = false;
371 peer_req->epoch = NULL;
372 peer_req->peer_device = peer_device;
373 peer_req->pages = page;
374 atomic_set(&peer_req->pending_bios, 0);
377 * The block_id is opaque to the receiver. It is not endianness
378 * converted, and sent back to the sender unchanged.
380 peer_req->block_id = id;
385 mempool_free(peer_req, drbd_ee_mempool);
389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
392 if (peer_req->flags & EE_HAS_DIGEST)
393 kfree(peer_req->digest);
394 drbd_free_pages(device, peer_req->pages, is_net);
395 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
396 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
397 mempool_free(peer_req, drbd_ee_mempool);
400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
402 LIST_HEAD(work_list);
403 struct drbd_peer_request *peer_req, *t;
405 int is_net = list == &device->net_ee;
407 spin_lock_irq(&device->resource->req_lock);
408 list_splice_init(list, &work_list);
409 spin_unlock_irq(&device->resource->req_lock);
411 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
412 __drbd_free_peer_req(device, peer_req, is_net);
419 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
421 static int drbd_finish_peer_reqs(struct drbd_device *device)
423 LIST_HEAD(work_list);
424 LIST_HEAD(reclaimed);
425 struct drbd_peer_request *peer_req, *t;
428 spin_lock_irq(&device->resource->req_lock);
429 reclaim_finished_net_peer_reqs(device, &reclaimed);
430 list_splice_init(&device->done_ee, &work_list);
431 spin_unlock_irq(&device->resource->req_lock);
433 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
434 drbd_free_net_peer_req(device, peer_req);
436 /* possible callbacks here:
437 * e_end_block, and e_end_resync_block, e_send_superseded.
438 * all ignore the last argument.
440 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
443 /* list_del not necessary, next/prev members not touched */
444 err2 = peer_req->w.cb(&peer_req->w, !!err);
447 drbd_free_peer_req(device, peer_req);
449 wake_up(&device->ee_wait);
454 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
455 struct list_head *head)
459 /* avoids spin_lock/unlock
460 * and calling prepare_to_wait in the fast path */
461 while (!list_empty(head)) {
462 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
463 spin_unlock_irq(&device->resource->req_lock);
465 finish_wait(&device->ee_wait, &wait);
466 spin_lock_irq(&device->resource->req_lock);
470 static void drbd_wait_ee_list_empty(struct drbd_device *device,
471 struct list_head *head)
473 spin_lock_irq(&device->resource->req_lock);
474 _drbd_wait_ee_list_empty(device, head);
475 spin_unlock_irq(&device->resource->req_lock);
478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
484 struct msghdr msg = {
485 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
487 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
494 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497 if (rv == -ECONNRESET)
498 drbd_info(connection, "sock was reset by peer\n");
499 else if (rv != -ERESTARTSYS)
500 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
501 } else if (rv == 0) {
502 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
513 drbd_info(connection, "sock was shut down by peer\n");
517 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
527 err = drbd_recv(connection, buf, size);
536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
540 err = drbd_recv_all(connection, buf, size);
541 if (err && !signal_pending(current))
542 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
547 * On individual connections, the socket buffer size must be set prior to the
548 * listen(2) or connect(2) calls in order to have it take effect.
549 * This is our wrapper to do so.
551 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554 /* open coded SO_SNDBUF, SO_RCVBUF */
556 sock->sk->sk_sndbuf = snd;
557 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560 sock->sk->sk_rcvbuf = rcv;
561 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
565 static struct socket *drbd_try_connect(struct drbd_connection *connection)
569 struct sockaddr_in6 src_in6;
570 struct sockaddr_in6 peer_in6;
572 int err, peer_addr_len, my_addr_len;
573 int sndbuf_size, rcvbuf_size, connect_int;
574 int disconnect_on_error = 1;
577 nc = rcu_dereference(connection->net_conf);
582 sndbuf_size = nc->sndbuf_size;
583 rcvbuf_size = nc->rcvbuf_size;
584 connect_int = nc->connect_int;
587 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
588 memcpy(&src_in6, &connection->my_addr, my_addr_len);
590 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
591 src_in6.sin6_port = 0;
593 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
595 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
596 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
598 what = "sock_create_kern";
599 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
600 SOCK_STREAM, IPPROTO_TCP, &sock);
606 sock->sk->sk_rcvtimeo =
607 sock->sk->sk_sndtimeo = connect_int * HZ;
608 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
610 /* explicitly bind to the configured IP as source IP
611 * for the outgoing connections.
612 * This is needed for multihomed hosts and to be
613 * able to use lo: interfaces for drbd.
614 * Make sure to use 0 as port number, so linux selects
615 * a free one dynamically.
617 what = "bind before connect";
618 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
622 /* connect may fail, peer not yet available.
623 * stay C_WF_CONNECTION, don't go Disconnecting! */
624 disconnect_on_error = 0;
626 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
635 /* timeout, busy, signal pending */
636 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637 case EINTR: case ERESTARTSYS:
638 /* peer not (yet) available, network problem */
639 case ECONNREFUSED: case ENETUNREACH:
640 case EHOSTDOWN: case EHOSTUNREACH:
641 disconnect_on_error = 0;
644 drbd_err(connection, "%s failed, err = %d\n", what, err);
646 if (disconnect_on_error)
647 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
653 struct accept_wait_data {
654 struct drbd_connection *connection;
655 struct socket *s_listen;
656 struct completion door_bell;
657 void (*original_sk_state_change)(struct sock *sk);
661 static void drbd_incoming_connection(struct sock *sk)
663 struct accept_wait_data *ad = sk->sk_user_data;
664 void (*state_change)(struct sock *sk);
666 state_change = ad->original_sk_state_change;
667 if (sk->sk_state == TCP_ESTABLISHED)
668 complete(&ad->door_bell);
672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
674 int err, sndbuf_size, rcvbuf_size, my_addr_len;
675 struct sockaddr_in6 my_addr;
676 struct socket *s_listen;
681 nc = rcu_dereference(connection->net_conf);
686 sndbuf_size = nc->sndbuf_size;
687 rcvbuf_size = nc->rcvbuf_size;
690 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
691 memcpy(&my_addr, &connection->my_addr, my_addr_len);
693 what = "sock_create_kern";
694 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
695 SOCK_STREAM, IPPROTO_TCP, &s_listen);
701 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
702 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
704 what = "bind before listen";
705 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
709 ad->s_listen = s_listen;
710 write_lock_bh(&s_listen->sk->sk_callback_lock);
711 ad->original_sk_state_change = s_listen->sk->sk_state_change;
712 s_listen->sk->sk_state_change = drbd_incoming_connection;
713 s_listen->sk->sk_user_data = ad;
714 write_unlock_bh(&s_listen->sk->sk_callback_lock);
717 err = s_listen->ops->listen(s_listen, 5);
724 sock_release(s_listen);
726 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
727 drbd_err(connection, "%s failed, err = %d\n", what, err);
728 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
737 write_lock_bh(&sk->sk_callback_lock);
738 sk->sk_state_change = ad->original_sk_state_change;
739 sk->sk_user_data = NULL;
740 write_unlock_bh(&sk->sk_callback_lock);
743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
745 int timeo, connect_int, err = 0;
746 struct socket *s_estab = NULL;
750 nc = rcu_dereference(connection->net_conf);
755 connect_int = nc->connect_int;
758 timeo = connect_int * HZ;
759 /* 28.5% random jitter */
760 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
762 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
766 err = kernel_accept(ad->s_listen, &s_estab, 0);
768 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769 drbd_err(connection, "accept failed, err = %d\n", err);
770 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
775 unregister_state_change(s_estab->sk, ad);
780 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
783 enum drbd_packet cmd)
785 if (!conn_prepare_command(connection, sock))
787 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
792 unsigned int header_size = drbd_header_size(connection);
793 struct packet_info pi;
796 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
797 if (err != header_size) {
802 err = decode_header(connection, connection->data.rbuf, &pi);
809 * drbd_socket_okay() - Free the socket if its connection is not okay
810 * @sock: pointer to the pointer to the socket.
812 static int drbd_socket_okay(struct socket **sock)
820 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
822 if (rr > 0 || rr == -EAGAIN) {
830 /* Gets called if a connection is established, or if a new minor gets created
832 int drbd_connected(struct drbd_peer_device *peer_device)
834 struct drbd_device *device = peer_device->device;
837 atomic_set(&device->packet_seq, 0);
838 device->peer_seq = 0;
840 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
841 &peer_device->connection->cstate_mutex :
842 &device->own_state_mutex;
844 err = drbd_send_sync_param(peer_device);
846 err = drbd_send_sizes(peer_device, 0, 0);
848 err = drbd_send_uuids(peer_device);
850 err = drbd_send_current_state(peer_device);
851 clear_bit(USE_DEGR_WFC_T, &device->flags);
852 clear_bit(RESIZE_PENDING, &device->flags);
853 atomic_set(&device->ap_in_flight, 0);
854 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
860 * 1 yes, we have a valid connection
861 * 0 oops, did not work out, please try again
862 * -1 peer talks different language,
863 * no point in trying again, please go standalone.
864 * -2 We do not have a network config...
866 static int conn_connect(struct drbd_connection *connection)
868 struct drbd_socket sock, msock;
869 struct drbd_peer_device *peer_device;
871 int vnr, timeout, h, ok;
872 bool discard_my_data;
873 enum drbd_state_rv rv;
874 struct accept_wait_data ad = {
875 .connection = connection,
876 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
879 clear_bit(DISCONNECT_SENT, &connection->flags);
880 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
883 mutex_init(&sock.mutex);
884 sock.sbuf = connection->data.sbuf;
885 sock.rbuf = connection->data.rbuf;
887 mutex_init(&msock.mutex);
888 msock.sbuf = connection->meta.sbuf;
889 msock.rbuf = connection->meta.rbuf;
892 /* Assume that the peer only understands protocol 80 until we know better. */
893 connection->agreed_pro_version = 80;
895 if (prepare_listen_socket(connection, &ad))
901 s = drbd_try_connect(connection);
905 send_first_packet(connection, &sock, P_INITIAL_DATA);
906 } else if (!msock.socket) {
907 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
909 send_first_packet(connection, &msock, P_INITIAL_META);
911 drbd_err(connection, "Logic error in conn_connect()\n");
912 goto out_release_sockets;
916 if (sock.socket && msock.socket) {
918 nc = rcu_dereference(connection->net_conf);
919 timeout = nc->ping_timeo * HZ / 10;
921 schedule_timeout_interruptible(timeout);
922 ok = drbd_socket_okay(&sock.socket);
923 ok = drbd_socket_okay(&msock.socket) && ok;
929 s = drbd_wait_for_connect(connection, &ad);
931 int fp = receive_first_packet(connection, s);
932 drbd_socket_okay(&sock.socket);
933 drbd_socket_okay(&msock.socket);
937 drbd_warn(connection, "initial packet S crossed\n");
938 sock_release(sock.socket);
945 set_bit(RESOLVE_CONFLICTS, &connection->flags);
947 drbd_warn(connection, "initial packet M crossed\n");
948 sock_release(msock.socket);
955 drbd_warn(connection, "Error receiving initial packet\n");
958 if (prandom_u32() & 1)
963 if (connection->cstate <= C_DISCONNECTING)
964 goto out_release_sockets;
965 if (signal_pending(current)) {
966 flush_signals(current);
968 if (get_t_state(&connection->receiver) == EXITING)
969 goto out_release_sockets;
972 ok = drbd_socket_okay(&sock.socket);
973 ok = drbd_socket_okay(&msock.socket) && ok;
977 sock_release(ad.s_listen);
979 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
980 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
982 sock.socket->sk->sk_allocation = GFP_NOIO;
983 msock.socket->sk->sk_allocation = GFP_NOIO;
985 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
986 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
989 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
990 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
991 * first set it to the P_CONNECTION_FEATURES timeout,
992 * which we set to 4x the configured ping_timeout. */
994 nc = rcu_dereference(connection->net_conf);
996 sock.socket->sk->sk_sndtimeo =
997 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
999 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000 timeout = nc->timeout * HZ / 10;
1001 discard_my_data = nc->discard_my_data;
1004 msock.socket->sk->sk_sndtimeo = timeout;
1006 /* we don't want delays.
1007 * we use TCP_CORK where appropriate, though */
1008 drbd_tcp_nodelay(sock.socket);
1009 drbd_tcp_nodelay(msock.socket);
1011 connection->data.socket = sock.socket;
1012 connection->meta.socket = msock.socket;
1013 connection->last_received = jiffies;
1015 h = drbd_do_features(connection);
1019 if (connection->cram_hmac_tfm) {
1020 /* drbd_request_state(device, NS(conn, WFAuth)); */
1021 switch (drbd_do_auth(connection)) {
1023 drbd_err(connection, "Authentication of peer failed\n");
1026 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1031 connection->data.socket->sk->sk_sndtimeo = timeout;
1032 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1034 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1037 /* Prevent a race between resync-handshake and
1038 * being promoted to Primary.
1040 * Grab and release the state mutex, so we know that any current
1041 * drbd_set_role() is finished, and any incoming drbd_set_role
1042 * will see the STATE_SENT flag, and wait for it to be cleared.
1044 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1045 mutex_lock(peer_device->device->state_mutex);
1047 set_bit(STATE_SENT, &connection->flags);
1049 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1050 mutex_unlock(peer_device->device->state_mutex);
1053 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1054 struct drbd_device *device = peer_device->device;
1055 kref_get(&device->kref);
1058 if (discard_my_data)
1059 set_bit(DISCARD_MY_DATA, &device->flags);
1061 clear_bit(DISCARD_MY_DATA, &device->flags);
1063 drbd_connected(peer_device);
1064 kref_put(&device->kref, drbd_destroy_device);
1069 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1070 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1071 clear_bit(STATE_SENT, &connection->flags);
1075 drbd_thread_start(&connection->asender);
1077 mutex_lock(&connection->resource->conf_update);
1078 /* The discard_my_data flag is a single-shot modifier to the next
1079 * connection attempt, the handshake of which is now well underway.
1080 * No need for rcu style copying of the whole struct
1081 * just to clear a single value. */
1082 connection->net_conf->discard_my_data = 0;
1083 mutex_unlock(&connection->resource->conf_update);
1087 out_release_sockets:
1089 sock_release(ad.s_listen);
1091 sock_release(sock.socket);
1093 sock_release(msock.socket);
1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1099 unsigned int header_size = drbd_header_size(connection);
1101 if (header_size == sizeof(struct p_header100) &&
1102 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1103 struct p_header100 *h = header;
1105 drbd_err(connection, "Header padding is not zero\n");
1108 pi->vnr = be16_to_cpu(h->volume);
1109 pi->cmd = be16_to_cpu(h->command);
1110 pi->size = be32_to_cpu(h->length);
1111 } else if (header_size == sizeof(struct p_header95) &&
1112 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1113 struct p_header95 *h = header;
1114 pi->cmd = be16_to_cpu(h->command);
1115 pi->size = be32_to_cpu(h->length);
1117 } else if (header_size == sizeof(struct p_header80) &&
1118 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1119 struct p_header80 *h = header;
1120 pi->cmd = be16_to_cpu(h->command);
1121 pi->size = be16_to_cpu(h->length);
1124 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1125 be32_to_cpu(*(__be32 *)header),
1126 connection->agreed_pro_version);
1129 pi->data = header + header_size;
1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1135 void *buffer = connection->data.rbuf;
1138 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1142 err = decode_header(connection, buffer, pi);
1143 connection->last_received = jiffies;
1148 static void drbd_flush(struct drbd_connection *connection)
1151 struct drbd_peer_device *peer_device;
1154 if (connection->write_ordering >= WO_bdev_flush) {
1156 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1157 struct drbd_device *device = peer_device->device;
1159 if (!get_ldev(device))
1161 kref_get(&device->kref);
1164 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1167 drbd_info(device, "local disk flush failed with status %d\n", rv);
1168 /* would rather check on EOPNOTSUPP, but that is not reliable.
1169 * don't try again for ANY return value != 0
1170 * if (rv == -EOPNOTSUPP) */
1171 drbd_bump_write_ordering(connection, WO_drain_io);
1174 kref_put(&device->kref, drbd_destroy_device);
1185 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1186 * @device: DRBD device.
1187 * @epoch: Epoch object.
1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1191 struct drbd_epoch *epoch,
1192 enum epoch_event ev)
1195 struct drbd_epoch *next_epoch;
1196 enum finish_epoch rv = FE_STILL_LIVE;
1198 spin_lock(&connection->epoch_lock);
1202 epoch_size = atomic_read(&epoch->epoch_size);
1204 switch (ev & ~EV_CLEANUP) {
1206 atomic_dec(&epoch->active);
1208 case EV_GOT_BARRIER_NR:
1209 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1211 case EV_BECAME_LAST:
1216 if (epoch_size != 0 &&
1217 atomic_read(&epoch->active) == 0 &&
1218 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1219 if (!(ev & EV_CLEANUP)) {
1220 spin_unlock(&connection->epoch_lock);
1221 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1222 spin_lock(&connection->epoch_lock);
1225 /* FIXME: dec unacked on connection, once we have
1226 * something to count pending connection packets in. */
1227 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1228 dec_unacked(epoch->connection);
1231 if (connection->current_epoch != epoch) {
1232 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1233 list_del(&epoch->list);
1234 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1235 connection->epochs--;
1238 if (rv == FE_STILL_LIVE)
1242 atomic_set(&epoch->epoch_size, 0);
1243 /* atomic_set(&epoch->active, 0); is already zero */
1244 if (rv == FE_STILL_LIVE)
1255 spin_unlock(&connection->epoch_lock);
1261 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1262 * @connection: DRBD connection.
1263 * @wo: Write ordering method to try.
1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1267 struct disk_conf *dc;
1268 struct drbd_peer_device *peer_device;
1269 enum write_ordering_e pwo;
1271 static char *write_ordering_str[] = {
1273 [WO_drain_io] = "drain",
1274 [WO_bdev_flush] = "flush",
1277 pwo = connection->write_ordering;
1280 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1281 struct drbd_device *device = peer_device->device;
1283 if (!get_ldev_if_state(device, D_ATTACHING))
1285 dc = rcu_dereference(device->ldev->disk_conf);
1287 if (wo == WO_bdev_flush && !dc->disk_flushes)
1289 if (wo == WO_drain_io && !dc->disk_drain)
1294 connection->write_ordering = wo;
1295 if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1296 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1300 * drbd_submit_peer_request()
1301 * @device: DRBD device.
1302 * @peer_req: peer request
1303 * @rw: flag field, see bio->bi_rw
1305 * May spread the pages to multiple bios,
1306 * depending on bio_add_page restrictions.
1308 * Returns 0 if all bios have been submitted,
1309 * -ENOMEM if we could not allocate enough bios,
1310 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1311 * single page to an empty bio (which should never happen and likely indicates
1312 * that the lower level IO stack is in some way broken). This has been observed
1313 * on certain Xen deployments.
1315 /* TODO allocate from our own bio_set. */
1316 int drbd_submit_peer_request(struct drbd_device *device,
1317 struct drbd_peer_request *peer_req,
1318 const unsigned rw, const int fault_type)
1320 struct bio *bios = NULL;
1322 struct page *page = peer_req->pages;
1323 sector_t sector = peer_req->i.sector;
1324 unsigned ds = peer_req->i.size;
1325 unsigned n_bios = 0;
1326 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1329 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1330 /* wait for all pending IO completions, before we start
1331 * zeroing things out. */
1332 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1333 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1334 sector, ds >> 9, GFP_NOIO))
1335 peer_req->flags |= EE_WAS_ERROR;
1336 drbd_endio_write_sec_final(peer_req);
1340 if (peer_req->flags & EE_IS_TRIM)
1341 nr_pages = 0; /* discards don't have any payload. */
1343 /* In most cases, we will only need one bio. But in case the lower
1344 * level restrictions happen to be different at this offset on this
1345 * side than those of the sending peer, we may need to submit the
1346 * request in more than one bio.
1348 * Plain bio_alloc is good enough here, this is no DRBD internally
1349 * generated bio, but a bio allocated on behalf of the peer.
1352 bio = bio_alloc(GFP_NOIO, nr_pages);
1354 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1357 /* > peer_req->i.sector, unless this is the first bio */
1358 bio->bi_iter.bi_sector = sector;
1359 bio->bi_bdev = device->ldev->backing_bdev;
1361 bio->bi_private = peer_req;
1362 bio->bi_end_io = drbd_peer_request_endio;
1364 bio->bi_next = bios;
1368 if (rw & REQ_DISCARD) {
1369 bio->bi_iter.bi_size = ds;
1373 page_chain_for_each(page) {
1374 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1375 if (!bio_add_page(bio, page, len, 0)) {
1376 /* A single page must always be possible!
1377 * But in case it fails anyways,
1378 * we deal with it, and complain (below). */
1379 if (bio->bi_vcnt == 0) {
1381 "bio_add_page failed for len=%u, "
1382 "bi_vcnt=0 (bi_sector=%llu)\n",
1383 len, (uint64_t)bio->bi_iter.bi_sector);
1393 D_ASSERT(device, ds == 0);
1395 D_ASSERT(device, page == NULL);
1397 atomic_set(&peer_req->pending_bios, n_bios);
1400 bios = bios->bi_next;
1401 bio->bi_next = NULL;
1403 drbd_generic_make_request(device, fault_type, bio);
1410 bios = bios->bi_next;
1416 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1417 struct drbd_peer_request *peer_req)
1419 struct drbd_interval *i = &peer_req->i;
1421 drbd_remove_interval(&device->write_requests, i);
1422 drbd_clear_interval(i);
1424 /* Wake up any processes waiting for this peer request to complete. */
1426 wake_up(&device->misc_wait);
1429 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1431 struct drbd_peer_device *peer_device;
1435 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1436 struct drbd_device *device = peer_device->device;
1438 kref_get(&device->kref);
1440 drbd_wait_ee_list_empty(device, &device->active_ee);
1441 kref_put(&device->kref, drbd_destroy_device);
1447 static struct drbd_peer_device *
1448 conn_peer_device(struct drbd_connection *connection, int volume_number)
1450 return idr_find(&connection->peer_devices, volume_number);
1453 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1456 struct p_barrier *p = pi->data;
1457 struct drbd_epoch *epoch;
1459 /* FIXME these are unacked on connection,
1460 * not a specific (peer)device.
1462 connection->current_epoch->barrier_nr = p->barrier;
1463 connection->current_epoch->connection = connection;
1464 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1466 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1467 * the activity log, which means it would not be resynced in case the
1468 * R_PRIMARY crashes now.
1469 * Therefore we must send the barrier_ack after the barrier request was
1471 switch (connection->write_ordering) {
1473 if (rv == FE_RECYCLED)
1476 /* receiver context, in the writeout path of the other node.
1477 * avoid potential distributed deadlock */
1478 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1482 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1487 conn_wait_active_ee_empty(connection);
1488 drbd_flush(connection);
1490 if (atomic_read(&connection->current_epoch->epoch_size)) {
1491 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1498 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1503 atomic_set(&epoch->epoch_size, 0);
1504 atomic_set(&epoch->active, 0);
1506 spin_lock(&connection->epoch_lock);
1507 if (atomic_read(&connection->current_epoch->epoch_size)) {
1508 list_add(&epoch->list, &connection->current_epoch->list);
1509 connection->current_epoch = epoch;
1510 connection->epochs++;
1512 /* The current_epoch got recycled while we allocated this one... */
1515 spin_unlock(&connection->epoch_lock);
1520 /* used from receive_RSDataReply (recv_resync_read)
1521 * and from receive_Data */
1522 static struct drbd_peer_request *
1523 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1524 struct packet_info *pi) __must_hold(local)
1526 struct drbd_device *device = peer_device->device;
1527 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1528 struct drbd_peer_request *peer_req;
1531 int data_size = pi->size;
1532 void *dig_in = peer_device->connection->int_dig_in;
1533 void *dig_vv = peer_device->connection->int_dig_vv;
1534 unsigned long *data;
1535 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1538 if (!trim && peer_device->connection->peer_integrity_tfm) {
1539 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1541 * FIXME: Receive the incoming digest into the receive buffer
1542 * here, together with its struct p_data?
1544 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1551 D_ASSERT(peer_device, data_size == 0);
1552 data_size = be32_to_cpu(trim->size);
1555 if (!expect(IS_ALIGNED(data_size, 512)))
1557 /* prepare for larger trim requests. */
1558 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1561 /* even though we trust out peer,
1562 * we sometimes have to double check. */
1563 if (sector + (data_size>>9) > capacity) {
1564 drbd_err(device, "request from peer beyond end of local disk: "
1565 "capacity: %llus < sector: %llus + size: %u\n",
1566 (unsigned long long)capacity,
1567 (unsigned long long)sector, data_size);
1571 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1572 * "criss-cross" setup, that might cause write-out on some other DRBD,
1573 * which in turn might block on the other node at this very place. */
1574 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1582 page = peer_req->pages;
1583 page_chain_for_each(page) {
1584 unsigned len = min_t(int, ds, PAGE_SIZE);
1586 err = drbd_recv_all_warn(peer_device->connection, data, len);
1587 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1588 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1589 data[0] = data[0] ^ (unsigned long)-1;
1593 drbd_free_peer_req(device, peer_req);
1600 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1601 if (memcmp(dig_in, dig_vv, dgs)) {
1602 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1603 (unsigned long long)sector, data_size);
1604 drbd_free_peer_req(device, peer_req);
1608 device->recv_cnt += data_size>>9;
1612 /* drbd_drain_block() just takes a data block
1613 * out of the socket input buffer, and discards it.
1615 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1624 page = drbd_alloc_pages(peer_device, 1, 1);
1628 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1630 err = drbd_recv_all_warn(peer_device->connection, data, len);
1636 drbd_free_pages(peer_device->device, page, 0);
1640 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1641 sector_t sector, int data_size)
1643 struct bio_vec bvec;
1644 struct bvec_iter iter;
1646 int dgs, err, expect;
1647 void *dig_in = peer_device->connection->int_dig_in;
1648 void *dig_vv = peer_device->connection->int_dig_vv;
1651 if (peer_device->connection->peer_integrity_tfm) {
1652 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1653 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1659 /* optimistically update recv_cnt. if receiving fails below,
1660 * we disconnect anyways, and counters will be reset. */
1661 peer_device->device->recv_cnt += data_size>>9;
1663 bio = req->master_bio;
1664 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1666 bio_for_each_segment(bvec, bio, iter) {
1667 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1668 expect = min_t(int, data_size, bvec.bv_len);
1669 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1670 kunmap(bvec.bv_page);
1673 data_size -= expect;
1677 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1678 if (memcmp(dig_in, dig_vv, dgs)) {
1679 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1684 D_ASSERT(peer_device->device, data_size == 0);
1689 * e_end_resync_block() is called in asender context via
1690 * drbd_finish_peer_reqs().
1692 static int e_end_resync_block(struct drbd_work *w, int unused)
1694 struct drbd_peer_request *peer_req =
1695 container_of(w, struct drbd_peer_request, w);
1696 struct drbd_peer_device *peer_device = peer_req->peer_device;
1697 struct drbd_device *device = peer_device->device;
1698 sector_t sector = peer_req->i.sector;
1701 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1703 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1704 drbd_set_in_sync(device, sector, peer_req->i.size);
1705 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1707 /* Record failure to sync */
1708 drbd_rs_failed_io(device, sector, peer_req->i.size);
1710 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1712 dec_unacked(device);
1717 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1718 struct packet_info *pi) __releases(local)
1720 struct drbd_device *device = peer_device->device;
1721 struct drbd_peer_request *peer_req;
1723 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1727 dec_rs_pending(device);
1729 inc_unacked(device);
1730 /* corresponding dec_unacked() in e_end_resync_block()
1731 * respective _drbd_clear_done_ee */
1733 peer_req->w.cb = e_end_resync_block;
1735 spin_lock_irq(&device->resource->req_lock);
1736 list_add(&peer_req->w.list, &device->sync_ee);
1737 spin_unlock_irq(&device->resource->req_lock);
1739 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1740 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1743 /* don't care for the reason here */
1744 drbd_err(device, "submit failed, triggering re-connect\n");
1745 spin_lock_irq(&device->resource->req_lock);
1746 list_del(&peer_req->w.list);
1747 spin_unlock_irq(&device->resource->req_lock);
1749 drbd_free_peer_req(device, peer_req);
1755 static struct drbd_request *
1756 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1757 sector_t sector, bool missing_ok, const char *func)
1759 struct drbd_request *req;
1761 /* Request object according to our peer */
1762 req = (struct drbd_request *)(unsigned long)id;
1763 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1766 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1767 (unsigned long)id, (unsigned long long)sector);
1772 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1774 struct drbd_peer_device *peer_device;
1775 struct drbd_device *device;
1776 struct drbd_request *req;
1779 struct p_data *p = pi->data;
1781 peer_device = conn_peer_device(connection, pi->vnr);
1784 device = peer_device->device;
1786 sector = be64_to_cpu(p->sector);
1788 spin_lock_irq(&device->resource->req_lock);
1789 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1790 spin_unlock_irq(&device->resource->req_lock);
1794 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1795 * special casing it there for the various failure cases.
1796 * still no race with drbd_fail_pending_reads */
1797 err = recv_dless_read(peer_device, req, sector, pi->size);
1799 req_mod(req, DATA_RECEIVED);
1800 /* else: nothing. handled from drbd_disconnect...
1801 * I don't think we may complete this just yet
1802 * in case we are "on-disconnect: freeze" */
1807 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1809 struct drbd_peer_device *peer_device;
1810 struct drbd_device *device;
1813 struct p_data *p = pi->data;
1815 peer_device = conn_peer_device(connection, pi->vnr);
1818 device = peer_device->device;
1820 sector = be64_to_cpu(p->sector);
1821 D_ASSERT(device, p->block_id == ID_SYNCER);
1823 if (get_ldev(device)) {
1824 /* data is submitted to disk within recv_resync_read.
1825 * corresponding put_ldev done below on error,
1826 * or in drbd_peer_request_endio. */
1827 err = recv_resync_read(peer_device, sector, pi);
1829 if (__ratelimit(&drbd_ratelimit_state))
1830 drbd_err(device, "Can not write resync data to local disk.\n");
1832 err = drbd_drain_block(peer_device, pi->size);
1834 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1837 atomic_add(pi->size >> 9, &device->rs_sect_in);
1842 static void restart_conflicting_writes(struct drbd_device *device,
1843 sector_t sector, int size)
1845 struct drbd_interval *i;
1846 struct drbd_request *req;
1848 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1851 req = container_of(i, struct drbd_request, i);
1852 if (req->rq_state & RQ_LOCAL_PENDING ||
1853 !(req->rq_state & RQ_POSTPONED))
1855 /* as it is RQ_POSTPONED, this will cause it to
1856 * be queued on the retry workqueue. */
1857 __req_mod(req, CONFLICT_RESOLVED, NULL);
1862 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1864 static int e_end_block(struct drbd_work *w, int cancel)
1866 struct drbd_peer_request *peer_req =
1867 container_of(w, struct drbd_peer_request, w);
1868 struct drbd_peer_device *peer_device = peer_req->peer_device;
1869 struct drbd_device *device = peer_device->device;
1870 sector_t sector = peer_req->i.sector;
1873 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1874 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1875 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1876 device->state.conn <= C_PAUSED_SYNC_T &&
1877 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1878 P_RS_WRITE_ACK : P_WRITE_ACK;
1879 err = drbd_send_ack(peer_device, pcmd, peer_req);
1880 if (pcmd == P_RS_WRITE_ACK)
1881 drbd_set_in_sync(device, sector, peer_req->i.size);
1883 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1884 /* we expect it to be marked out of sync anyways...
1885 * maybe assert this? */
1887 dec_unacked(device);
1889 /* we delete from the conflict detection hash _after_ we sent out the
1890 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1891 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1892 spin_lock_irq(&device->resource->req_lock);
1893 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1894 drbd_remove_epoch_entry_interval(device, peer_req);
1895 if (peer_req->flags & EE_RESTART_REQUESTS)
1896 restart_conflicting_writes(device, sector, peer_req->i.size);
1897 spin_unlock_irq(&device->resource->req_lock);
1899 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1901 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1906 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1908 struct drbd_peer_request *peer_req =
1909 container_of(w, struct drbd_peer_request, w);
1910 struct drbd_peer_device *peer_device = peer_req->peer_device;
1913 err = drbd_send_ack(peer_device, ack, peer_req);
1914 dec_unacked(peer_device->device);
1919 static int e_send_superseded(struct drbd_work *w, int unused)
1921 return e_send_ack(w, P_SUPERSEDED);
1924 static int e_send_retry_write(struct drbd_work *w, int unused)
1926 struct drbd_peer_request *peer_req =
1927 container_of(w, struct drbd_peer_request, w);
1928 struct drbd_connection *connection = peer_req->peer_device->connection;
1930 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1931 P_RETRY_WRITE : P_SUPERSEDED);
1934 static bool seq_greater(u32 a, u32 b)
1937 * We assume 32-bit wrap-around here.
1938 * For 24-bit wrap-around, we would have to shift:
1941 return (s32)a - (s32)b > 0;
1944 static u32 seq_max(u32 a, u32 b)
1946 return seq_greater(a, b) ? a : b;
1949 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1951 struct drbd_device *device = peer_device->device;
1952 unsigned int newest_peer_seq;
1954 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1955 spin_lock(&device->peer_seq_lock);
1956 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1957 device->peer_seq = newest_peer_seq;
1958 spin_unlock(&device->peer_seq_lock);
1959 /* wake up only if we actually changed device->peer_seq */
1960 if (peer_seq == newest_peer_seq)
1961 wake_up(&device->seq_wait);
1965 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1967 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1970 /* maybe change sync_ee into interval trees as well? */
1971 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1973 struct drbd_peer_request *rs_req;
1976 spin_lock_irq(&device->resource->req_lock);
1977 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1978 if (overlaps(peer_req->i.sector, peer_req->i.size,
1979 rs_req->i.sector, rs_req->i.size)) {
1984 spin_unlock_irq(&device->resource->req_lock);
1989 /* Called from receive_Data.
1990 * Synchronize packets on sock with packets on msock.
1992 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1993 * packet traveling on msock, they are still processed in the order they have
1996 * Note: we don't care for Ack packets overtaking P_DATA packets.
1998 * In case packet_seq is larger than device->peer_seq number, there are
1999 * outstanding packets on the msock. We wait for them to arrive.
2000 * In case we are the logically next packet, we update device->peer_seq
2001 * ourselves. Correctly handles 32bit wrap around.
2003 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2004 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2005 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2006 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2008 * returns 0 if we may process the packet,
2009 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2010 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2012 struct drbd_device *device = peer_device->device;
2017 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2020 spin_lock(&device->peer_seq_lock);
2022 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2023 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2027 if (signal_pending(current)) {
2033 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2039 /* Only need to wait if two_primaries is enabled */
2040 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2041 spin_unlock(&device->peer_seq_lock);
2043 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2045 timeout = schedule_timeout(timeout);
2046 spin_lock(&device->peer_seq_lock);
2049 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2053 spin_unlock(&device->peer_seq_lock);
2054 finish_wait(&device->seq_wait, &wait);
2058 /* see also bio_flags_to_wire()
2059 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2060 * flags and back. We may replicate to other kernel versions. */
2061 static unsigned long wire_flags_to_bio(u32 dpf)
2063 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2064 (dpf & DP_FUA ? REQ_FUA : 0) |
2065 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2066 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2069 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2072 struct drbd_interval *i;
2075 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2076 struct drbd_request *req;
2077 struct bio_and_error m;
2081 req = container_of(i, struct drbd_request, i);
2082 if (!(req->rq_state & RQ_POSTPONED))
2084 req->rq_state &= ~RQ_POSTPONED;
2085 __req_mod(req, NEG_ACKED, &m);
2086 spin_unlock_irq(&device->resource->req_lock);
2088 complete_master_bio(device, &m);
2089 spin_lock_irq(&device->resource->req_lock);
2094 static int handle_write_conflicts(struct drbd_device *device,
2095 struct drbd_peer_request *peer_req)
2097 struct drbd_connection *connection = peer_req->peer_device->connection;
2098 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2099 sector_t sector = peer_req->i.sector;
2100 const unsigned int size = peer_req->i.size;
2101 struct drbd_interval *i;
2106 * Inserting the peer request into the write_requests tree will prevent
2107 * new conflicting local requests from being added.
2109 drbd_insert_interval(&device->write_requests, &peer_req->i);
2112 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2113 if (i == &peer_req->i)
2118 * Our peer has sent a conflicting remote request; this
2119 * should not happen in a two-node setup. Wait for the
2120 * earlier peer request to complete.
2122 err = drbd_wait_misc(device, i);
2128 equal = i->sector == sector && i->size == size;
2129 if (resolve_conflicts) {
2131 * If the peer request is fully contained within the
2132 * overlapping request, it can be considered overwritten
2133 * and thus superseded; otherwise, it will be retried
2134 * once all overlapping requests have completed.
2136 bool superseded = i->sector <= sector && i->sector +
2137 (i->size >> 9) >= sector + (size >> 9);
2140 drbd_alert(device, "Concurrent writes detected: "
2141 "local=%llus +%u, remote=%llus +%u, "
2142 "assuming %s came first\n",
2143 (unsigned long long)i->sector, i->size,
2144 (unsigned long long)sector, size,
2145 superseded ? "local" : "remote");
2147 inc_unacked(device);
2148 peer_req->w.cb = superseded ? e_send_superseded :
2150 list_add_tail(&peer_req->w.list, &device->done_ee);
2151 wake_asender(connection);
2156 struct drbd_request *req =
2157 container_of(i, struct drbd_request, i);
2160 drbd_alert(device, "Concurrent writes detected: "
2161 "local=%llus +%u, remote=%llus +%u\n",
2162 (unsigned long long)i->sector, i->size,
2163 (unsigned long long)sector, size);
2165 if (req->rq_state & RQ_LOCAL_PENDING ||
2166 !(req->rq_state & RQ_POSTPONED)) {
2168 * Wait for the node with the discard flag to
2169 * decide if this request has been superseded
2170 * or needs to be retried.
2171 * Requests that have been superseded will
2172 * disappear from the write_requests tree.
2174 * In addition, wait for the conflicting
2175 * request to finish locally before submitting
2176 * the conflicting peer request.
2178 err = drbd_wait_misc(device, &req->i);
2180 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2181 fail_postponed_requests(device, sector, size);
2187 * Remember to restart the conflicting requests after
2188 * the new peer request has completed.
2190 peer_req->flags |= EE_RESTART_REQUESTS;
2197 drbd_remove_epoch_entry_interval(device, peer_req);
2201 /* mirrored write */
2202 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2204 struct drbd_peer_device *peer_device;
2205 struct drbd_device *device;
2207 struct drbd_peer_request *peer_req;
2208 struct p_data *p = pi->data;
2209 u32 peer_seq = be32_to_cpu(p->seq_num);
2214 peer_device = conn_peer_device(connection, pi->vnr);
2217 device = peer_device->device;
2219 if (!get_ldev(device)) {
2222 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2223 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2224 atomic_inc(&connection->current_epoch->epoch_size);
2225 err2 = drbd_drain_block(peer_device, pi->size);
2232 * Corresponding put_ldev done either below (on various errors), or in
2233 * drbd_peer_request_endio, if we successfully submit the data at the
2234 * end of this function.
2237 sector = be64_to_cpu(p->sector);
2238 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2244 peer_req->w.cb = e_end_block;
2246 dp_flags = be32_to_cpu(p->dp_flags);
2247 rw |= wire_flags_to_bio(dp_flags);
2248 if (pi->cmd == P_TRIM) {
2249 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2250 peer_req->flags |= EE_IS_TRIM;
2251 if (!blk_queue_discard(q))
2252 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2253 D_ASSERT(peer_device, peer_req->i.size > 0);
2254 D_ASSERT(peer_device, rw & REQ_DISCARD);
2255 D_ASSERT(peer_device, peer_req->pages == NULL);
2256 } else if (peer_req->pages == NULL) {
2257 D_ASSERT(device, peer_req->i.size == 0);
2258 D_ASSERT(device, dp_flags & DP_FLUSH);
2261 if (dp_flags & DP_MAY_SET_IN_SYNC)
2262 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2264 spin_lock(&connection->epoch_lock);
2265 peer_req->epoch = connection->current_epoch;
2266 atomic_inc(&peer_req->epoch->epoch_size);
2267 atomic_inc(&peer_req->epoch->active);
2268 spin_unlock(&connection->epoch_lock);
2271 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2274 peer_req->flags |= EE_IN_INTERVAL_TREE;
2275 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2277 goto out_interrupted;
2278 spin_lock_irq(&device->resource->req_lock);
2279 err = handle_write_conflicts(device, peer_req);
2281 spin_unlock_irq(&device->resource->req_lock);
2282 if (err == -ENOENT) {
2286 goto out_interrupted;
2289 update_peer_seq(peer_device, peer_seq);
2290 spin_lock_irq(&device->resource->req_lock);
2292 /* if we use the zeroout fallback code, we process synchronously
2293 * and we wait for all pending requests, respectively wait for
2294 * active_ee to become empty in drbd_submit_peer_request();
2295 * better not add ourselves here. */
2296 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2297 list_add(&peer_req->w.list, &device->active_ee);
2298 spin_unlock_irq(&device->resource->req_lock);
2300 if (device->state.conn == C_SYNC_TARGET)
2301 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2303 if (peer_device->connection->agreed_pro_version < 100) {
2305 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2307 dp_flags |= DP_SEND_WRITE_ACK;
2310 dp_flags |= DP_SEND_RECEIVE_ACK;
2316 if (dp_flags & DP_SEND_WRITE_ACK) {
2317 peer_req->flags |= EE_SEND_WRITE_ACK;
2318 inc_unacked(device);
2319 /* corresponding dec_unacked() in e_end_block()
2320 * respective _drbd_clear_done_ee */
2323 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2324 /* I really don't like it that the receiver thread
2325 * sends on the msock, but anyways */
2326 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2329 if (device->state.pdsk < D_INCONSISTENT) {
2330 /* In case we have the only disk of the cluster, */
2331 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2332 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2333 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2334 drbd_al_begin_io(device, &peer_req->i, true);
2337 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2341 /* don't care for the reason here */
2342 drbd_err(device, "submit failed, triggering re-connect\n");
2343 spin_lock_irq(&device->resource->req_lock);
2344 list_del(&peer_req->w.list);
2345 drbd_remove_epoch_entry_interval(device, peer_req);
2346 spin_unlock_irq(&device->resource->req_lock);
2347 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2348 drbd_al_complete_io(device, &peer_req->i);
2351 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2353 drbd_free_peer_req(device, peer_req);
2357 /* We may throttle resync, if the lower device seems to be busy,
2358 * and current sync rate is above c_min_rate.
2360 * To decide whether or not the lower device is busy, we use a scheme similar
2361 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2362 * (more than 64 sectors) of activity we cannot account for with our own resync
2363 * activity, it obviously is "busy".
2365 * The current sync rate used here uses only the most recent two step marks,
2366 * to have a short time average so we can react faster.
2368 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2370 struct lc_element *tmp;
2371 bool throttle = true;
2373 if (!drbd_rs_c_min_rate_throttle(device))
2376 spin_lock_irq(&device->al_lock);
2377 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2379 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2380 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2382 /* Do not slow down if app IO is already waiting for this extent */
2384 spin_unlock_irq(&device->al_lock);
2389 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2391 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2392 unsigned long db, dt, dbdt;
2393 unsigned int c_min_rate;
2397 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2400 /* feature disabled? */
2401 if (c_min_rate == 0)
2404 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2405 (int)part_stat_read(&disk->part0, sectors[1]) -
2406 atomic_read(&device->rs_sect_ev);
2407 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2408 unsigned long rs_left;
2411 device->rs_last_events = curr_events;
2413 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2415 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2417 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2418 rs_left = device->ov_left;
2420 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2422 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2425 db = device->rs_mark_left[i] - rs_left;
2426 dbdt = Bit2KB(db/dt);
2428 if (dbdt > c_min_rate)
2434 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2436 struct drbd_peer_device *peer_device;
2437 struct drbd_device *device;
2440 struct drbd_peer_request *peer_req;
2441 struct digest_info *di = NULL;
2443 unsigned int fault_type;
2444 struct p_block_req *p = pi->data;
2446 peer_device = conn_peer_device(connection, pi->vnr);
2449 device = peer_device->device;
2450 capacity = drbd_get_capacity(device->this_bdev);
2452 sector = be64_to_cpu(p->sector);
2453 size = be32_to_cpu(p->blksize);
2455 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2456 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2457 (unsigned long long)sector, size);
2460 if (sector + (size>>9) > capacity) {
2461 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2462 (unsigned long long)sector, size);
2466 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2469 case P_DATA_REQUEST:
2470 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2472 case P_RS_DATA_REQUEST:
2473 case P_CSUM_RS_REQUEST:
2475 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2479 dec_rs_pending(device);
2480 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2485 if (verb && __ratelimit(&drbd_ratelimit_state))
2486 drbd_err(device, "Can not satisfy peer's read request, "
2487 "no local data.\n");
2489 /* drain possibly payload */
2490 return drbd_drain_block(peer_device, pi->size);
2493 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2494 * "criss-cross" setup, that might cause write-out on some other DRBD,
2495 * which in turn might block on the other node at this very place. */
2496 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2497 true /* has real payload */, GFP_NOIO);
2504 case P_DATA_REQUEST:
2505 peer_req->w.cb = w_e_end_data_req;
2506 fault_type = DRBD_FAULT_DT_RD;
2507 /* application IO, don't drbd_rs_begin_io */
2510 case P_RS_DATA_REQUEST:
2511 peer_req->w.cb = w_e_end_rsdata_req;
2512 fault_type = DRBD_FAULT_RS_RD;
2513 /* used in the sector offset progress display */
2514 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2518 case P_CSUM_RS_REQUEST:
2519 fault_type = DRBD_FAULT_RS_RD;
2520 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2524 di->digest_size = pi->size;
2525 di->digest = (((char *)di)+sizeof(struct digest_info));
2527 peer_req->digest = di;
2528 peer_req->flags |= EE_HAS_DIGEST;
2530 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2533 if (pi->cmd == P_CSUM_RS_REQUEST) {
2534 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2535 peer_req->w.cb = w_e_end_csum_rs_req;
2536 /* used in the sector offset progress display */
2537 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2538 } else if (pi->cmd == P_OV_REPLY) {
2539 /* track progress, we may need to throttle */
2540 atomic_add(size >> 9, &device->rs_sect_in);
2541 peer_req->w.cb = w_e_end_ov_reply;
2542 dec_rs_pending(device);
2543 /* drbd_rs_begin_io done when we sent this request,
2544 * but accounting still needs to be done. */
2545 goto submit_for_resync;
2550 if (device->ov_start_sector == ~(sector_t)0 &&
2551 peer_device->connection->agreed_pro_version >= 90) {
2552 unsigned long now = jiffies;
2554 device->ov_start_sector = sector;
2555 device->ov_position = sector;
2556 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2557 device->rs_total = device->ov_left;
2558 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2559 device->rs_mark_left[i] = device->ov_left;
2560 device->rs_mark_time[i] = now;
2562 drbd_info(device, "Online Verify start sector: %llu\n",
2563 (unsigned long long)sector);
2565 peer_req->w.cb = w_e_end_ov_req;
2566 fault_type = DRBD_FAULT_RS_RD;
2573 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2574 * wrt the receiver, but it is not as straightforward as it may seem.
2575 * Various places in the resync start and stop logic assume resync
2576 * requests are processed in order, requeuing this on the worker thread
2577 * introduces a bunch of new code for synchronization between threads.
2579 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2580 * "forever", throttling after drbd_rs_begin_io will lock that extent
2581 * for application writes for the same time. For now, just throttle
2582 * here, where the rest of the code expects the receiver to sleep for
2586 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2587 * this defers syncer requests for some time, before letting at least
2588 * on request through. The resync controller on the receiving side
2589 * will adapt to the incoming rate accordingly.
2591 * We cannot throttle here if remote is Primary/SyncTarget:
2592 * we would also throttle its application reads.
2593 * In that case, throttling is done on the SyncTarget only.
2595 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2596 schedule_timeout_uninterruptible(HZ/10);
2597 if (drbd_rs_begin_io(device, sector))
2601 atomic_add(size >> 9, &device->rs_sect_ev);
2604 inc_unacked(device);
2605 spin_lock_irq(&device->resource->req_lock);
2606 list_add_tail(&peer_req->w.list, &device->read_ee);
2607 spin_unlock_irq(&device->resource->req_lock);
2609 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2612 /* don't care for the reason here */
2613 drbd_err(device, "submit failed, triggering re-connect\n");
2614 spin_lock_irq(&device->resource->req_lock);
2615 list_del(&peer_req->w.list);
2616 spin_unlock_irq(&device->resource->req_lock);
2617 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2621 drbd_free_peer_req(device, peer_req);
2626 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2628 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2630 struct drbd_device *device = peer_device->device;
2631 int self, peer, rv = -100;
2632 unsigned long ch_self, ch_peer;
2633 enum drbd_after_sb_p after_sb_0p;
2635 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2636 peer = device->p_uuid[UI_BITMAP] & 1;
2638 ch_peer = device->p_uuid[UI_SIZE];
2639 ch_self = device->comm_bm_set;
2642 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2644 switch (after_sb_0p) {
2646 case ASB_DISCARD_SECONDARY:
2647 case ASB_CALL_HELPER:
2649 drbd_err(device, "Configuration error.\n");
2651 case ASB_DISCONNECT:
2653 case ASB_DISCARD_YOUNGER_PRI:
2654 if (self == 0 && peer == 1) {
2658 if (self == 1 && peer == 0) {
2662 /* Else fall through to one of the other strategies... */
2663 case ASB_DISCARD_OLDER_PRI:
2664 if (self == 0 && peer == 1) {
2668 if (self == 1 && peer == 0) {
2672 /* Else fall through to one of the other strategies... */
2673 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2674 "Using discard-least-changes instead\n");
2675 case ASB_DISCARD_ZERO_CHG:
2676 if (ch_peer == 0 && ch_self == 0) {
2677 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2681 if (ch_peer == 0) { rv = 1; break; }
2682 if (ch_self == 0) { rv = -1; break; }
2684 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2686 case ASB_DISCARD_LEAST_CHG:
2687 if (ch_self < ch_peer)
2689 else if (ch_self > ch_peer)
2691 else /* ( ch_self == ch_peer ) */
2692 /* Well, then use something else. */
2693 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2696 case ASB_DISCARD_LOCAL:
2699 case ASB_DISCARD_REMOTE:
2707 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2709 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2711 struct drbd_device *device = peer_device->device;
2713 enum drbd_after_sb_p after_sb_1p;
2716 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2718 switch (after_sb_1p) {
2719 case ASB_DISCARD_YOUNGER_PRI:
2720 case ASB_DISCARD_OLDER_PRI:
2721 case ASB_DISCARD_LEAST_CHG:
2722 case ASB_DISCARD_LOCAL:
2723 case ASB_DISCARD_REMOTE:
2724 case ASB_DISCARD_ZERO_CHG:
2725 drbd_err(device, "Configuration error.\n");
2727 case ASB_DISCONNECT:
2730 hg = drbd_asb_recover_0p(peer_device);
2731 if (hg == -1 && device->state.role == R_SECONDARY)
2733 if (hg == 1 && device->state.role == R_PRIMARY)
2737 rv = drbd_asb_recover_0p(peer_device);
2739 case ASB_DISCARD_SECONDARY:
2740 return device->state.role == R_PRIMARY ? 1 : -1;
2741 case ASB_CALL_HELPER:
2742 hg = drbd_asb_recover_0p(peer_device);
2743 if (hg == -1 && device->state.role == R_PRIMARY) {
2744 enum drbd_state_rv rv2;
2746 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2747 * we might be here in C_WF_REPORT_PARAMS which is transient.
2748 * we do not need to wait for the after state change work either. */
2749 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2750 if (rv2 != SS_SUCCESS) {
2751 drbd_khelper(device, "pri-lost-after-sb");
2753 drbd_warn(device, "Successfully gave up primary role.\n");
2764 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2766 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2768 struct drbd_device *device = peer_device->device;
2770 enum drbd_after_sb_p after_sb_2p;
2773 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2775 switch (after_sb_2p) {
2776 case ASB_DISCARD_YOUNGER_PRI:
2777 case ASB_DISCARD_OLDER_PRI:
2778 case ASB_DISCARD_LEAST_CHG:
2779 case ASB_DISCARD_LOCAL:
2780 case ASB_DISCARD_REMOTE:
2782 case ASB_DISCARD_SECONDARY:
2783 case ASB_DISCARD_ZERO_CHG:
2784 drbd_err(device, "Configuration error.\n");
2787 rv = drbd_asb_recover_0p(peer_device);
2789 case ASB_DISCONNECT:
2791 case ASB_CALL_HELPER:
2792 hg = drbd_asb_recover_0p(peer_device);
2794 enum drbd_state_rv rv2;
2796 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2797 * we might be here in C_WF_REPORT_PARAMS which is transient.
2798 * we do not need to wait for the after state change work either. */
2799 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2800 if (rv2 != SS_SUCCESS) {
2801 drbd_khelper(device, "pri-lost-after-sb");
2803 drbd_warn(device, "Successfully gave up primary role.\n");
2813 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2814 u64 bits, u64 flags)
2817 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2820 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2822 (unsigned long long)uuid[UI_CURRENT],
2823 (unsigned long long)uuid[UI_BITMAP],
2824 (unsigned long long)uuid[UI_HISTORY_START],
2825 (unsigned long long)uuid[UI_HISTORY_END],
2826 (unsigned long long)bits,
2827 (unsigned long long)flags);
2831 100 after split brain try auto recover
2832 2 C_SYNC_SOURCE set BitMap
2833 1 C_SYNC_SOURCE use BitMap
2835 -1 C_SYNC_TARGET use BitMap
2836 -2 C_SYNC_TARGET set BitMap
2837 -100 after split brain, disconnect
2838 -1000 unrelated data
2839 -1091 requires proto 91
2840 -1096 requires proto 96
2842 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2847 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2848 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2851 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2855 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2856 peer != UUID_JUST_CREATED)
2860 if (self != UUID_JUST_CREATED &&
2861 (peer == UUID_JUST_CREATED || peer == (u64)0))
2865 int rct, dc; /* roles at crash time */
2867 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2869 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2872 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2873 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2874 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2875 drbd_uuid_move_history(device);
2876 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2877 device->ldev->md.uuid[UI_BITMAP] = 0;
2879 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2880 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2883 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2890 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2892 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2895 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2896 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2897 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2899 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2900 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2901 device->p_uuid[UI_BITMAP] = 0UL;
2903 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2906 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2913 /* Common power [off|failure] */
2914 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2915 (device->p_uuid[UI_FLAGS] & 2);
2916 /* lowest bit is set when we were primary,
2917 * next bit (weight 2) is set when peer was primary */
2921 case 0: /* !self_pri && !peer_pri */ return 0;
2922 case 1: /* self_pri && !peer_pri */ return 1;
2923 case 2: /* !self_pri && peer_pri */ return -1;
2924 case 3: /* self_pri && peer_pri */
2925 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2931 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2936 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2938 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2939 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2940 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2941 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2942 /* The last P_SYNC_UUID did not get though. Undo the last start of
2943 resync as sync source modifications of the peer's UUIDs. */
2945 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2948 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2949 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2951 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2952 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2959 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2960 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2961 peer = device->p_uuid[i] & ~((u64)1);
2967 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2968 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2973 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2975 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2976 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2977 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2978 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2979 /* The last P_SYNC_UUID did not get though. Undo the last start of
2980 resync as sync source modifications of our UUIDs. */
2982 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2985 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2986 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2988 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2989 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2990 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2998 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2999 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3000 self = device->ldev->md.uuid[i] & ~((u64)1);
3006 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3007 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3008 if (self == peer && self != ((u64)0))
3012 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3013 self = device->ldev->md.uuid[i] & ~((u64)1);
3014 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3015 peer = device->p_uuid[j] & ~((u64)1);
3024 /* drbd_sync_handshake() returns the new conn state on success, or
3025 CONN_MASK (-1) on failure.
3027 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3028 enum drbd_role peer_role,
3029 enum drbd_disk_state peer_disk) __must_hold(local)
3031 struct drbd_device *device = peer_device->device;
3032 enum drbd_conns rv = C_MASK;
3033 enum drbd_disk_state mydisk;
3034 struct net_conf *nc;
3035 int hg, rule_nr, rr_conflict, tentative;
3037 mydisk = device->state.disk;
3038 if (mydisk == D_NEGOTIATING)
3039 mydisk = device->new_state_tmp.disk;
3041 drbd_info(device, "drbd_sync_handshake:\n");
3043 spin_lock_irq(&device->ldev->md.uuid_lock);
3044 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3045 drbd_uuid_dump(device, "peer", device->p_uuid,
3046 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3048 hg = drbd_uuid_compare(device, &rule_nr);
3049 spin_unlock_irq(&device->ldev->md.uuid_lock);
3051 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3054 drbd_alert(device, "Unrelated data, aborting!\n");
3058 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3062 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3063 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3064 int f = (hg == -100) || abs(hg) == 2;
3065 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3068 drbd_info(device, "Becoming sync %s due to disk states.\n",
3069 hg > 0 ? "source" : "target");
3073 drbd_khelper(device, "initial-split-brain");
3076 nc = rcu_dereference(peer_device->connection->net_conf);
3078 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3079 int pcount = (device->state.role == R_PRIMARY)
3080 + (peer_role == R_PRIMARY);
3081 int forced = (hg == -100);
3085 hg = drbd_asb_recover_0p(peer_device);
3088 hg = drbd_asb_recover_1p(peer_device);
3091 hg = drbd_asb_recover_2p(peer_device);
3094 if (abs(hg) < 100) {
3095 drbd_warn(device, "Split-Brain detected, %d primaries, "
3096 "automatically solved. Sync from %s node\n",
3097 pcount, (hg < 0) ? "peer" : "this");
3099 drbd_warn(device, "Doing a full sync, since"
3100 " UUIDs where ambiguous.\n");
3107 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3109 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3113 drbd_warn(device, "Split-Brain detected, manually solved. "
3114 "Sync from %s node\n",
3115 (hg < 0) ? "peer" : "this");
3117 rr_conflict = nc->rr_conflict;
3118 tentative = nc->tentative;
3122 /* FIXME this log message is not correct if we end up here
3123 * after an attempted attach on a diskless node.
3124 * We just refuse to attach -- well, we drop the "connection"
3125 * to that disk, in a way... */
3126 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3127 drbd_khelper(device, "split-brain");
3131 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3132 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3136 if (hg < 0 && /* by intention we do not use mydisk here. */
3137 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3138 switch (rr_conflict) {
3139 case ASB_CALL_HELPER:
3140 drbd_khelper(device, "pri-lost");
3142 case ASB_DISCONNECT:
3143 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3146 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3151 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3153 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3155 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3156 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3157 abs(hg) >= 2 ? "full" : "bit-map based");
3162 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3163 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3164 BM_LOCKED_SET_ALLOWED))
3168 if (hg > 0) { /* become sync source. */
3170 } else if (hg < 0) { /* become sync target */
3174 if (drbd_bm_total_weight(device)) {
3175 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3176 drbd_bm_total_weight(device));
3183 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3185 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3186 if (peer == ASB_DISCARD_REMOTE)
3187 return ASB_DISCARD_LOCAL;
3189 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3190 if (peer == ASB_DISCARD_LOCAL)
3191 return ASB_DISCARD_REMOTE;
3193 /* everything else is valid if they are equal on both sides. */
3197 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3199 struct p_protocol *p = pi->data;
3200 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3201 int p_proto, p_discard_my_data, p_two_primaries, cf;
3202 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3203 char integrity_alg[SHARED_SECRET_MAX] = "";
3204 struct crypto_hash *peer_integrity_tfm = NULL;
3205 void *int_dig_in = NULL, *int_dig_vv = NULL;
3207 p_proto = be32_to_cpu(p->protocol);
3208 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3209 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3210 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3211 p_two_primaries = be32_to_cpu(p->two_primaries);
3212 cf = be32_to_cpu(p->conn_flags);
3213 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3215 if (connection->agreed_pro_version >= 87) {
3218 if (pi->size > sizeof(integrity_alg))
3220 err = drbd_recv_all(connection, integrity_alg, pi->size);
3223 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3226 if (pi->cmd != P_PROTOCOL_UPDATE) {
3227 clear_bit(CONN_DRY_RUN, &connection->flags);
3229 if (cf & CF_DRY_RUN)
3230 set_bit(CONN_DRY_RUN, &connection->flags);
3233 nc = rcu_dereference(connection->net_conf);
3235 if (p_proto != nc->wire_protocol) {
3236 drbd_err(connection, "incompatible %s settings\n", "protocol");
3237 goto disconnect_rcu_unlock;
3240 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3241 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3242 goto disconnect_rcu_unlock;
3245 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3246 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3247 goto disconnect_rcu_unlock;
3250 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3251 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3252 goto disconnect_rcu_unlock;
3255 if (p_discard_my_data && nc->discard_my_data) {
3256 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3257 goto disconnect_rcu_unlock;
3260 if (p_two_primaries != nc->two_primaries) {
3261 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3262 goto disconnect_rcu_unlock;
3265 if (strcmp(integrity_alg, nc->integrity_alg)) {
3266 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3267 goto disconnect_rcu_unlock;
3273 if (integrity_alg[0]) {
3277 * We can only change the peer data integrity algorithm
3278 * here. Changing our own data integrity algorithm
3279 * requires that we send a P_PROTOCOL_UPDATE packet at
3280 * the same time; otherwise, the peer has no way to
3281 * tell between which packets the algorithm should
3285 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3286 if (!peer_integrity_tfm) {
3287 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3292 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3293 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3294 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3295 if (!(int_dig_in && int_dig_vv)) {
3296 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3301 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3302 if (!new_net_conf) {
3303 drbd_err(connection, "Allocation of new net_conf failed\n");
3307 mutex_lock(&connection->data.mutex);
3308 mutex_lock(&connection->resource->conf_update);
3309 old_net_conf = connection->net_conf;
3310 *new_net_conf = *old_net_conf;
3312 new_net_conf->wire_protocol = p_proto;
3313 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3314 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3315 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3316 new_net_conf->two_primaries = p_two_primaries;
3318 rcu_assign_pointer(connection->net_conf, new_net_conf);
3319 mutex_unlock(&connection->resource->conf_update);
3320 mutex_unlock(&connection->data.mutex);
3322 crypto_free_hash(connection->peer_integrity_tfm);
3323 kfree(connection->int_dig_in);
3324 kfree(connection->int_dig_vv);
3325 connection->peer_integrity_tfm = peer_integrity_tfm;
3326 connection->int_dig_in = int_dig_in;
3327 connection->int_dig_vv = int_dig_vv;
3329 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3330 drbd_info(connection, "peer data-integrity-alg: %s\n",
3331 integrity_alg[0] ? integrity_alg : "(none)");
3334 kfree(old_net_conf);
3337 disconnect_rcu_unlock:
3340 crypto_free_hash(peer_integrity_tfm);
3343 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3348 * input: alg name, feature name
3349 * return: NULL (alg name was "")
3350 * ERR_PTR(error) if something goes wrong
3351 * or the crypto hash ptr, if it worked out ok. */
3353 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3354 const char *alg, const char *name)
3356 struct crypto_hash *tfm;
3361 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3363 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3364 alg, name, PTR_ERR(tfm));
3370 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3372 void *buffer = connection->data.rbuf;
3373 int size = pi->size;
3376 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3377 s = drbd_recv(connection, buffer, s);
3391 * config_unknown_volume - device configuration command for unknown volume
3393 * When a device is added to an existing connection, the node on which the
3394 * device is added first will send configuration commands to its peer but the
3395 * peer will not know about the device yet. It will warn and ignore these
3396 * commands. Once the device is added on the second node, the second node will
3397 * send the same device configuration commands, but in the other direction.
3399 * (We can also end up here if drbd is misconfigured.)
3401 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3403 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3404 cmdname(pi->cmd), pi->vnr);
3405 return ignore_remaining_packet(connection, pi);
3408 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3410 struct drbd_peer_device *peer_device;
3411 struct drbd_device *device;
3412 struct p_rs_param_95 *p;
3413 unsigned int header_size, data_size, exp_max_sz;
3414 struct crypto_hash *verify_tfm = NULL;
3415 struct crypto_hash *csums_tfm = NULL;
3416 struct net_conf *old_net_conf, *new_net_conf = NULL;
3417 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3418 const int apv = connection->agreed_pro_version;
3419 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3423 peer_device = conn_peer_device(connection, pi->vnr);
3425 return config_unknown_volume(connection, pi);
3426 device = peer_device->device;
3428 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3429 : apv == 88 ? sizeof(struct p_rs_param)
3431 : apv <= 94 ? sizeof(struct p_rs_param_89)
3432 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3434 if (pi->size > exp_max_sz) {
3435 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3436 pi->size, exp_max_sz);
3441 header_size = sizeof(struct p_rs_param);
3442 data_size = pi->size - header_size;
3443 } else if (apv <= 94) {
3444 header_size = sizeof(struct p_rs_param_89);
3445 data_size = pi->size - header_size;
3446 D_ASSERT(device, data_size == 0);
3448 header_size = sizeof(struct p_rs_param_95);
3449 data_size = pi->size - header_size;
3450 D_ASSERT(device, data_size == 0);
3453 /* initialize verify_alg and csums_alg */
3455 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3457 err = drbd_recv_all(peer_device->connection, p, header_size);
3461 mutex_lock(&connection->resource->conf_update);
3462 old_net_conf = peer_device->connection->net_conf;
3463 if (get_ldev(device)) {
3464 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3465 if (!new_disk_conf) {
3467 mutex_unlock(&connection->resource->conf_update);
3468 drbd_err(device, "Allocation of new disk_conf failed\n");
3472 old_disk_conf = device->ldev->disk_conf;
3473 *new_disk_conf = *old_disk_conf;
3475 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3480 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3481 drbd_err(device, "verify-alg of wrong size, "
3482 "peer wants %u, accepting only up to %u byte\n",
3483 data_size, SHARED_SECRET_MAX);
3488 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3491 /* we expect NUL terminated string */
3492 /* but just in case someone tries to be evil */
3493 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3494 p->verify_alg[data_size-1] = 0;
3496 } else /* apv >= 89 */ {
3497 /* we still expect NUL terminated strings */
3498 /* but just in case someone tries to be evil */
3499 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3500 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3501 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3502 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3505 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3506 if (device->state.conn == C_WF_REPORT_PARAMS) {
3507 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3508 old_net_conf->verify_alg, p->verify_alg);
3511 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3512 p->verify_alg, "verify-alg");
3513 if (IS_ERR(verify_tfm)) {
3519 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3520 if (device->state.conn == C_WF_REPORT_PARAMS) {
3521 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3522 old_net_conf->csums_alg, p->csums_alg);
3525 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3526 p->csums_alg, "csums-alg");
3527 if (IS_ERR(csums_tfm)) {
3533 if (apv > 94 && new_disk_conf) {
3534 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3535 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3536 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3537 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3539 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3540 if (fifo_size != device->rs_plan_s->size) {
3541 new_plan = fifo_alloc(fifo_size);
3543 drbd_err(device, "kmalloc of fifo_buffer failed");
3550 if (verify_tfm || csums_tfm) {
3551 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3552 if (!new_net_conf) {
3553 drbd_err(device, "Allocation of new net_conf failed\n");
3557 *new_net_conf = *old_net_conf;
3560 strcpy(new_net_conf->verify_alg, p->verify_alg);
3561 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3562 crypto_free_hash(peer_device->connection->verify_tfm);
3563 peer_device->connection->verify_tfm = verify_tfm;
3564 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3567 strcpy(new_net_conf->csums_alg, p->csums_alg);
3568 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3569 crypto_free_hash(peer_device->connection->csums_tfm);
3570 peer_device->connection->csums_tfm = csums_tfm;
3571 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3573 rcu_assign_pointer(connection->net_conf, new_net_conf);
3577 if (new_disk_conf) {
3578 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3583 old_plan = device->rs_plan_s;
3584 rcu_assign_pointer(device->rs_plan_s, new_plan);
3587 mutex_unlock(&connection->resource->conf_update);
3590 kfree(old_net_conf);
3591 kfree(old_disk_conf);
3597 if (new_disk_conf) {
3599 kfree(new_disk_conf);
3601 mutex_unlock(&connection->resource->conf_update);
3606 if (new_disk_conf) {
3608 kfree(new_disk_conf);
3610 mutex_unlock(&connection->resource->conf_update);
3611 /* just for completeness: actually not needed,
3612 * as this is not reached if csums_tfm was ok. */
3613 crypto_free_hash(csums_tfm);
3614 /* but free the verify_tfm again, if csums_tfm did not work out */
3615 crypto_free_hash(verify_tfm);
3616 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3620 /* warn if the arguments differ by more than 12.5% */
3621 static void warn_if_differ_considerably(struct drbd_device *device,
3622 const char *s, sector_t a, sector_t b)
3625 if (a == 0 || b == 0)
3627 d = (a > b) ? (a - b) : (b - a);
3628 if (d > (a>>3) || d > (b>>3))
3629 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3630 (unsigned long long)a, (unsigned long long)b);
3633 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3635 struct drbd_peer_device *peer_device;
3636 struct drbd_device *device;
3637 struct p_sizes *p = pi->data;
3638 enum determine_dev_size dd = DS_UNCHANGED;
3639 sector_t p_size, p_usize, my_usize;
3640 int ldsc = 0; /* local disk size changed */
3641 enum dds_flags ddsf;
3643 peer_device = conn_peer_device(connection, pi->vnr);
3645 return config_unknown_volume(connection, pi);
3646 device = peer_device->device;
3648 p_size = be64_to_cpu(p->d_size);
3649 p_usize = be64_to_cpu(p->u_size);
3651 /* just store the peer's disk size for now.
3652 * we still need to figure out whether we accept that. */
3653 device->p_size = p_size;
3655 if (get_ldev(device)) {
3657 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3660 warn_if_differ_considerably(device, "lower level device sizes",
3661 p_size, drbd_get_max_capacity(device->ldev));
3662 warn_if_differ_considerably(device, "user requested size",
3665 /* if this is the first connect, or an otherwise expected
3666 * param exchange, choose the minimum */
3667 if (device->state.conn == C_WF_REPORT_PARAMS)
3668 p_usize = min_not_zero(my_usize, p_usize);
3670 /* Never shrink a device with usable data during connect.
3671 But allow online shrinking if we are connected. */
3672 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3673 drbd_get_capacity(device->this_bdev) &&
3674 device->state.disk >= D_OUTDATED &&
3675 device->state.conn < C_CONNECTED) {
3676 drbd_err(device, "The peer's disk size is too small!\n");
3677 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3682 if (my_usize != p_usize) {
3683 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3685 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3686 if (!new_disk_conf) {
3687 drbd_err(device, "Allocation of new disk_conf failed\n");
3692 mutex_lock(&connection->resource->conf_update);
3693 old_disk_conf = device->ldev->disk_conf;
3694 *new_disk_conf = *old_disk_conf;
3695 new_disk_conf->disk_size = p_usize;
3697 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698 mutex_unlock(&connection->resource->conf_update);
3700 kfree(old_disk_conf);
3702 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3703 (unsigned long)my_usize);
3709 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3710 drbd_reconsider_max_bio_size(device);
3711 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3712 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3713 drbd_reconsider_max_bio_size(), we can be sure that after
3714 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3716 ddsf = be16_to_cpu(p->dds_flags);
3717 if (get_ldev(device)) {
3718 dd = drbd_determine_dev_size(device, ddsf, NULL);
3722 drbd_md_sync(device);
3724 /* I am diskless, need to accept the peer's size. */
3725 drbd_set_my_capacity(device, p_size);
3728 if (get_ldev(device)) {
3729 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3730 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3737 if (device->state.conn > C_WF_REPORT_PARAMS) {
3738 if (be64_to_cpu(p->c_size) !=
3739 drbd_get_capacity(device->this_bdev) || ldsc) {
3740 /* we have different sizes, probably peer
3741 * needs to know my new size... */
3742 drbd_send_sizes(peer_device, 0, ddsf);
3744 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3745 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3746 if (device->state.pdsk >= D_INCONSISTENT &&
3747 device->state.disk >= D_INCONSISTENT) {
3748 if (ddsf & DDSF_NO_RESYNC)
3749 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3751 resync_after_online_grow(device);
3753 set_bit(RESYNC_AFTER_NEG, &device->flags);
3760 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3762 struct drbd_peer_device *peer_device;
3763 struct drbd_device *device;
3764 struct p_uuids *p = pi->data;
3766 int i, updated_uuids = 0;
3768 peer_device = conn_peer_device(connection, pi->vnr);
3770 return config_unknown_volume(connection, pi);
3771 device = peer_device->device;
3773 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3775 drbd_err(device, "kmalloc of p_uuid failed\n");
3779 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3780 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3782 kfree(device->p_uuid);
3783 device->p_uuid = p_uuid;
3785 if (device->state.conn < C_CONNECTED &&
3786 device->state.disk < D_INCONSISTENT &&
3787 device->state.role == R_PRIMARY &&
3788 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3789 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3790 (unsigned long long)device->ed_uuid);
3791 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3795 if (get_ldev(device)) {
3796 int skip_initial_sync =
3797 device->state.conn == C_CONNECTED &&
3798 peer_device->connection->agreed_pro_version >= 90 &&
3799 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3800 (p_uuid[UI_FLAGS] & 8);
3801 if (skip_initial_sync) {
3802 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3803 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3804 "clear_n_write from receive_uuids",
3805 BM_LOCKED_TEST_ALLOWED);
3806 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3807 _drbd_uuid_set(device, UI_BITMAP, 0);
3808 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3810 drbd_md_sync(device);
3814 } else if (device->state.disk < D_INCONSISTENT &&
3815 device->state.role == R_PRIMARY) {
3816 /* I am a diskless primary, the peer just created a new current UUID
3818 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3821 /* Before we test for the disk state, we should wait until an eventually
3822 ongoing cluster wide state change is finished. That is important if
3823 we are primary and are detaching from our disk. We need to see the
3824 new disk state... */
3825 mutex_lock(device->state_mutex);
3826 mutex_unlock(device->state_mutex);
3827 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3828 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3831 drbd_print_uuids(device, "receiver updated UUIDs to");
3837 * convert_state() - Converts the peer's view of the cluster state to our point of view
3838 * @ps: The state as seen by the peer.
3840 static union drbd_state convert_state(union drbd_state ps)
3842 union drbd_state ms;
3844 static enum drbd_conns c_tab[] = {
3845 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3846 [C_CONNECTED] = C_CONNECTED,
3848 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3849 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3850 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3851 [C_VERIFY_S] = C_VERIFY_T,
3857 ms.conn = c_tab[ps.conn];
3862 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3867 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3869 struct drbd_peer_device *peer_device;
3870 struct drbd_device *device;
3871 struct p_req_state *p = pi->data;
3872 union drbd_state mask, val;
3873 enum drbd_state_rv rv;
3875 peer_device = conn_peer_device(connection, pi->vnr);
3878 device = peer_device->device;
3880 mask.i = be32_to_cpu(p->mask);
3881 val.i = be32_to_cpu(p->val);
3883 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3884 mutex_is_locked(device->state_mutex)) {
3885 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3889 mask = convert_state(mask);
3890 val = convert_state(val);
3892 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3893 drbd_send_sr_reply(peer_device, rv);
3895 drbd_md_sync(device);
3900 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3902 struct p_req_state *p = pi->data;
3903 union drbd_state mask, val;
3904 enum drbd_state_rv rv;
3906 mask.i = be32_to_cpu(p->mask);
3907 val.i = be32_to_cpu(p->val);
3909 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3910 mutex_is_locked(&connection->cstate_mutex)) {
3911 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3915 mask = convert_state(mask);
3916 val = convert_state(val);
3918 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3919 conn_send_sr_reply(connection, rv);
3924 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3926 struct drbd_peer_device *peer_device;
3927 struct drbd_device *device;
3928 struct p_state *p = pi->data;
3929 union drbd_state os, ns, peer_state;
3930 enum drbd_disk_state real_peer_disk;
3931 enum chg_state_flags cs_flags;
3934 peer_device = conn_peer_device(connection, pi->vnr);
3936 return config_unknown_volume(connection, pi);
3937 device = peer_device->device;
3939 peer_state.i = be32_to_cpu(p->state);
3941 real_peer_disk = peer_state.disk;
3942 if (peer_state.disk == D_NEGOTIATING) {
3943 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3944 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3947 spin_lock_irq(&device->resource->req_lock);
3949 os = ns = drbd_read_state(device);
3950 spin_unlock_irq(&device->resource->req_lock);
3952 /* If some other part of the code (asender thread, timeout)
3953 * already decided to close the connection again,
3954 * we must not "re-establish" it here. */
3955 if (os.conn <= C_TEAR_DOWN)
3958 /* If this is the "end of sync" confirmation, usually the peer disk
3959 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3960 * set) resync started in PausedSyncT, or if the timing of pause-/
3961 * unpause-sync events has been "just right", the peer disk may
3962 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3964 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3965 real_peer_disk == D_UP_TO_DATE &&
3966 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3967 /* If we are (becoming) SyncSource, but peer is still in sync
3968 * preparation, ignore its uptodate-ness to avoid flapping, it
3969 * will change to inconsistent once the peer reaches active
3971 * It may have changed syncer-paused flags, however, so we
3972 * cannot ignore this completely. */
3973 if (peer_state.conn > C_CONNECTED &&
3974 peer_state.conn < C_SYNC_SOURCE)
3975 real_peer_disk = D_INCONSISTENT;
3977 /* if peer_state changes to connected at the same time,
3978 * it explicitly notifies us that it finished resync.
3979 * Maybe we should finish it up, too? */
3980 else if (os.conn >= C_SYNC_SOURCE &&
3981 peer_state.conn == C_CONNECTED) {
3982 if (drbd_bm_total_weight(device) <= device->rs_failed)
3983 drbd_resync_finished(device);
3988 /* explicit verify finished notification, stop sector reached. */
3989 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3990 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3991 ov_out_of_sync_print(device);
3992 drbd_resync_finished(device);
3996 /* peer says his disk is inconsistent, while we think it is uptodate,
3997 * and this happens while the peer still thinks we have a sync going on,
3998 * but we think we are already done with the sync.
3999 * We ignore this to avoid flapping pdsk.
4000 * This should not happen, if the peer is a recent version of drbd. */
4001 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4002 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4003 real_peer_disk = D_UP_TO_DATE;
4005 if (ns.conn == C_WF_REPORT_PARAMS)
4006 ns.conn = C_CONNECTED;
4008 if (peer_state.conn == C_AHEAD)
4011 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4012 get_ldev_if_state(device, D_NEGOTIATING)) {
4013 int cr; /* consider resync */
4015 /* if we established a new connection */
4016 cr = (os.conn < C_CONNECTED);
4017 /* if we had an established connection
4018 * and one of the nodes newly attaches a disk */
4019 cr |= (os.conn == C_CONNECTED &&
4020 (peer_state.disk == D_NEGOTIATING ||
4021 os.disk == D_NEGOTIATING));
4022 /* if we have both been inconsistent, and the peer has been
4023 * forced to be UpToDate with --overwrite-data */
4024 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4025 /* if we had been plain connected, and the admin requested to
4026 * start a sync by "invalidate" or "invalidate-remote" */
4027 cr |= (os.conn == C_CONNECTED &&
4028 (peer_state.conn >= C_STARTING_SYNC_S &&
4029 peer_state.conn <= C_WF_BITMAP_T));
4032 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4035 if (ns.conn == C_MASK) {
4036 ns.conn = C_CONNECTED;
4037 if (device->state.disk == D_NEGOTIATING) {
4038 drbd_force_state(device, NS(disk, D_FAILED));
4039 } else if (peer_state.disk == D_NEGOTIATING) {
4040 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4041 peer_state.disk = D_DISKLESS;
4042 real_peer_disk = D_DISKLESS;
4044 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4046 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4047 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4053 spin_lock_irq(&device->resource->req_lock);
4054 if (os.i != drbd_read_state(device).i)
4056 clear_bit(CONSIDER_RESYNC, &device->flags);
4057 ns.peer = peer_state.role;
4058 ns.pdsk = real_peer_disk;
4059 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4060 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4061 ns.disk = device->new_state_tmp.disk;
4062 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4063 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4064 test_bit(NEW_CUR_UUID, &device->flags)) {
4065 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4066 for temporal network outages! */
4067 spin_unlock_irq(&device->resource->req_lock);
4068 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4069 tl_clear(peer_device->connection);
4070 drbd_uuid_new_current(device);
4071 clear_bit(NEW_CUR_UUID, &device->flags);
4072 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4075 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4076 ns = drbd_read_state(device);
4077 spin_unlock_irq(&device->resource->req_lock);
4079 if (rv < SS_SUCCESS) {
4080 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4084 if (os.conn > C_WF_REPORT_PARAMS) {
4085 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4086 peer_state.disk != D_NEGOTIATING ) {
4087 /* we want resync, peer has not yet decided to sync... */
4088 /* Nowadays only used when forcing a node into primary role and
4089 setting its disk to UpToDate with that */
4090 drbd_send_uuids(peer_device);
4091 drbd_send_current_state(peer_device);
4095 clear_bit(DISCARD_MY_DATA, &device->flags);
4097 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4102 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4104 struct drbd_peer_device *peer_device;
4105 struct drbd_device *device;
4106 struct p_rs_uuid *p = pi->data;
4108 peer_device = conn_peer_device(connection, pi->vnr);
4111 device = peer_device->device;
4113 wait_event(device->misc_wait,
4114 device->state.conn == C_WF_SYNC_UUID ||
4115 device->state.conn == C_BEHIND ||
4116 device->state.conn < C_CONNECTED ||
4117 device->state.disk < D_NEGOTIATING);
4119 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4121 /* Here the _drbd_uuid_ functions are right, current should
4122 _not_ be rotated into the history */
4123 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4124 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4125 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4127 drbd_print_uuids(device, "updated sync uuid");
4128 drbd_start_resync(device, C_SYNC_TARGET);
4132 drbd_err(device, "Ignoring SyncUUID packet!\n");
4138 * receive_bitmap_plain
4140 * Return 0 when done, 1 when another iteration is needed, and a negative error
4141 * code upon failure.
4144 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4145 unsigned long *p, struct bm_xfer_ctx *c)
4147 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4148 drbd_header_size(peer_device->connection);
4149 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4150 c->bm_words - c->word_offset);
4151 unsigned int want = num_words * sizeof(*p);
4155 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4160 err = drbd_recv_all(peer_device->connection, p, want);
4164 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4166 c->word_offset += num_words;
4167 c->bit_offset = c->word_offset * BITS_PER_LONG;
4168 if (c->bit_offset > c->bm_bits)
4169 c->bit_offset = c->bm_bits;
4174 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4176 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4179 static int dcbp_get_start(struct p_compressed_bm *p)
4181 return (p->encoding & 0x80) != 0;
4184 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4186 return (p->encoding >> 4) & 0x7;
4192 * Return 0 when done, 1 when another iteration is needed, and a negative error
4193 * code upon failure.
4196 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4197 struct p_compressed_bm *p,
4198 struct bm_xfer_ctx *c,
4201 struct bitstream bs;
4205 unsigned long s = c->bit_offset;
4207 int toggle = dcbp_get_start(p);
4211 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4213 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4217 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4218 bits = vli_decode_bits(&rl, look_ahead);
4224 if (e >= c->bm_bits) {
4225 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4228 _drbd_bm_set_bits(peer_device->device, s, e);
4232 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4233 have, bits, look_ahead,
4234 (unsigned int)(bs.cur.b - p->code),
4235 (unsigned int)bs.buf_len);
4238 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4239 if (likely(bits < 64))
4240 look_ahead >>= bits;
4245 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4248 look_ahead |= tmp << have;
4253 bm_xfer_ctx_bit_to_word_offset(c);
4255 return (s != c->bm_bits);
4261 * Return 0 when done, 1 when another iteration is needed, and a negative error
4262 * code upon failure.
4265 decode_bitmap_c(struct drbd_peer_device *peer_device,
4266 struct p_compressed_bm *p,
4267 struct bm_xfer_ctx *c,
4270 if (dcbp_get_code(p) == RLE_VLI_Bits)
4271 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4273 /* other variants had been implemented for evaluation,
4274 * but have been dropped as this one turned out to be "best"
4275 * during all our tests. */
4277 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4278 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4282 void INFO_bm_xfer_stats(struct drbd_device *device,
4283 const char *direction, struct bm_xfer_ctx *c)
4285 /* what would it take to transfer it "plaintext" */
4286 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4287 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4288 unsigned int plain =
4289 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4290 c->bm_words * sizeof(unsigned long);
4291 unsigned int total = c->bytes[0] + c->bytes[1];
4294 /* total can not be zero. but just in case: */
4298 /* don't report if not compressed */
4302 /* total < plain. check for overflow, still */
4303 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4304 : (1000 * total / plain);
4310 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4311 "total %u; compression: %u.%u%%\n",
4313 c->bytes[1], c->packets[1],
4314 c->bytes[0], c->packets[0],
4315 total, r/10, r % 10);
4318 /* Since we are processing the bitfield from lower addresses to higher,
4319 it does not matter if the process it in 32 bit chunks or 64 bit
4320 chunks as long as it is little endian. (Understand it as byte stream,
4321 beginning with the lowest byte...) If we would use big endian
4322 we would need to process it from the highest address to the lowest,
4323 in order to be agnostic to the 32 vs 64 bits issue.
4325 returns 0 on failure, 1 if we successfully received it. */
4326 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4328 struct drbd_peer_device *peer_device;
4329 struct drbd_device *device;
4330 struct bm_xfer_ctx c;
4333 peer_device = conn_peer_device(connection, pi->vnr);
4336 device = peer_device->device;
4338 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4339 /* you are supposed to send additional out-of-sync information
4340 * if you actually set bits during this phase */
4342 c = (struct bm_xfer_ctx) {
4343 .bm_bits = drbd_bm_bits(device),
4344 .bm_words = drbd_bm_words(device),
4348 if (pi->cmd == P_BITMAP)
4349 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4350 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4351 /* MAYBE: sanity check that we speak proto >= 90,
4352 * and the feature is enabled! */
4353 struct p_compressed_bm *p = pi->data;
4355 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4356 drbd_err(device, "ReportCBitmap packet too large\n");
4360 if (pi->size <= sizeof(*p)) {
4361 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4365 err = drbd_recv_all(peer_device->connection, p, pi->size);
4368 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4370 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4375 c.packets[pi->cmd == P_BITMAP]++;
4376 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4383 err = drbd_recv_header(peer_device->connection, pi);
4388 INFO_bm_xfer_stats(device, "receive", &c);
4390 if (device->state.conn == C_WF_BITMAP_T) {
4391 enum drbd_state_rv rv;
4393 err = drbd_send_bitmap(device);
4396 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4397 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4398 D_ASSERT(device, rv == SS_SUCCESS);
4399 } else if (device->state.conn != C_WF_BITMAP_S) {
4400 /* admin may have requested C_DISCONNECTING,
4401 * other threads may have noticed network errors */
4402 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4403 drbd_conn_str(device->state.conn));
4408 drbd_bm_unlock(device);
4409 if (!err && device->state.conn == C_WF_BITMAP_S)
4410 drbd_start_resync(device, C_SYNC_SOURCE);
4414 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4416 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4419 return ignore_remaining_packet(connection, pi);
4422 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4424 /* Make sure we've acked all the TCP data associated
4425 * with the data requests being unplugged */
4426 drbd_tcp_quickack(connection->data.socket);
4431 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4433 struct drbd_peer_device *peer_device;
4434 struct drbd_device *device;
4435 struct p_block_desc *p = pi->data;
4437 peer_device = conn_peer_device(connection, pi->vnr);
4440 device = peer_device->device;
4442 switch (device->state.conn) {
4443 case C_WF_SYNC_UUID:
4448 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4449 drbd_conn_str(device->state.conn));
4452 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4460 int (*fn)(struct drbd_connection *, struct packet_info *);
4463 static struct data_cmd drbd_cmd_handler[] = {
4464 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4465 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4466 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4467 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4468 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4469 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4470 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4471 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4472 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4473 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4474 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4475 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4476 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4477 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4478 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4479 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4480 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4481 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4482 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4483 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4484 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4485 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4486 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4487 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4488 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4491 static void drbdd(struct drbd_connection *connection)
4493 struct packet_info pi;
4494 size_t shs; /* sub header size */
4497 while (get_t_state(&connection->receiver) == RUNNING) {
4498 struct data_cmd *cmd;
4500 drbd_thread_current_set_cpu(&connection->receiver);
4501 if (drbd_recv_header(connection, &pi))
4504 cmd = &drbd_cmd_handler[pi.cmd];
4505 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4506 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4507 cmdname(pi.cmd), pi.cmd);
4511 shs = cmd->pkt_size;
4512 if (pi.size > shs && !cmd->expect_payload) {
4513 drbd_err(connection, "No payload expected %s l:%d\n",
4514 cmdname(pi.cmd), pi.size);
4519 err = drbd_recv_all_warn(connection, pi.data, shs);
4525 err = cmd->fn(connection, &pi);
4527 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4528 cmdname(pi.cmd), err, pi.size);
4535 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4538 static void conn_disconnect(struct drbd_connection *connection)
4540 struct drbd_peer_device *peer_device;
4544 if (connection->cstate == C_STANDALONE)
4547 /* We are about to start the cleanup after connection loss.
4548 * Make sure drbd_make_request knows about that.
4549 * Usually we should be in some network failure state already,
4550 * but just in case we are not, we fix it up here.
4552 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4554 /* asender does not clean up anything. it must not interfere, either */
4555 drbd_thread_stop(&connection->asender);
4556 drbd_free_sock(connection);
4559 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4560 struct drbd_device *device = peer_device->device;
4561 kref_get(&device->kref);
4563 drbd_disconnected(peer_device);
4564 kref_put(&device->kref, drbd_destroy_device);
4569 if (!list_empty(&connection->current_epoch->list))
4570 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4571 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4572 atomic_set(&connection->current_epoch->epoch_size, 0);
4573 connection->send.seen_any_write_yet = false;
4575 drbd_info(connection, "Connection closed\n");
4577 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4578 conn_try_outdate_peer_async(connection);
4580 spin_lock_irq(&connection->resource->req_lock);
4581 oc = connection->cstate;
4582 if (oc >= C_UNCONNECTED)
4583 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4585 spin_unlock_irq(&connection->resource->req_lock);
4587 if (oc == C_DISCONNECTING)
4588 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4591 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4593 struct drbd_device *device = peer_device->device;
4596 /* wait for current activity to cease. */
4597 spin_lock_irq(&device->resource->req_lock);
4598 _drbd_wait_ee_list_empty(device, &device->active_ee);
4599 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4600 _drbd_wait_ee_list_empty(device, &device->read_ee);
4601 spin_unlock_irq(&device->resource->req_lock);
4603 /* We do not have data structures that would allow us to
4604 * get the rs_pending_cnt down to 0 again.
4605 * * On C_SYNC_TARGET we do not have any data structures describing
4606 * the pending RSDataRequest's we have sent.
4607 * * On C_SYNC_SOURCE there is no data structure that tracks
4608 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4609 * And no, it is not the sum of the reference counts in the
4610 * resync_LRU. The resync_LRU tracks the whole operation including
4611 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4613 drbd_rs_cancel_all(device);
4614 device->rs_total = 0;
4615 device->rs_failed = 0;
4616 atomic_set(&device->rs_pending_cnt, 0);
4617 wake_up(&device->misc_wait);
4619 del_timer_sync(&device->resync_timer);
4620 resync_timer_fn((unsigned long)device);
4622 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4623 * w_make_resync_request etc. which may still be on the worker queue
4624 * to be "canceled" */
4625 drbd_flush_workqueue(&peer_device->connection->sender_work);
4627 drbd_finish_peer_reqs(device);
4629 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4630 might have issued a work again. The one before drbd_finish_peer_reqs() is
4631 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4632 drbd_flush_workqueue(&peer_device->connection->sender_work);
4634 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4635 * again via drbd_try_clear_on_disk_bm(). */
4636 drbd_rs_cancel_all(device);
4638 kfree(device->p_uuid);
4639 device->p_uuid = NULL;
4641 if (!drbd_suspended(device))
4642 tl_clear(peer_device->connection);
4644 drbd_md_sync(device);
4646 /* serialize with bitmap writeout triggered by the state change,
4648 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4650 /* tcp_close and release of sendpage pages can be deferred. I don't
4651 * want to use SO_LINGER, because apparently it can be deferred for
4652 * more than 20 seconds (longest time I checked).
4654 * Actually we don't care for exactly when the network stack does its
4655 * put_page(), but release our reference on these pages right here.
4657 i = drbd_free_peer_reqs(device, &device->net_ee);
4659 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4660 i = atomic_read(&device->pp_in_use_by_net);
4662 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4663 i = atomic_read(&device->pp_in_use);
4665 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4667 D_ASSERT(device, list_empty(&device->read_ee));
4668 D_ASSERT(device, list_empty(&device->active_ee));
4669 D_ASSERT(device, list_empty(&device->sync_ee));
4670 D_ASSERT(device, list_empty(&device->done_ee));
4676 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4677 * we can agree on is stored in agreed_pro_version.
4679 * feature flags and the reserved array should be enough room for future
4680 * enhancements of the handshake protocol, and possible plugins...
4682 * for now, they are expected to be zero, but ignored.
4684 static int drbd_send_features(struct drbd_connection *connection)
4686 struct drbd_socket *sock;
4687 struct p_connection_features *p;
4689 sock = &connection->data;
4690 p = conn_prepare_command(connection, sock);
4693 memset(p, 0, sizeof(*p));
4694 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4695 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4696 p->feature_flags = cpu_to_be32(PRO_FEATURES);
4697 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4702 * 1 yes, we have a valid connection
4703 * 0 oops, did not work out, please try again
4704 * -1 peer talks different language,
4705 * no point in trying again, please go standalone.
4707 static int drbd_do_features(struct drbd_connection *connection)
4709 /* ASSERT current == connection->receiver ... */
4710 struct p_connection_features *p;
4711 const int expect = sizeof(struct p_connection_features);
4712 struct packet_info pi;
4715 err = drbd_send_features(connection);
4719 err = drbd_recv_header(connection, &pi);
4723 if (pi.cmd != P_CONNECTION_FEATURES) {
4724 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4725 cmdname(pi.cmd), pi.cmd);
4729 if (pi.size != expect) {
4730 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4736 err = drbd_recv_all_warn(connection, p, expect);
4740 p->protocol_min = be32_to_cpu(p->protocol_min);
4741 p->protocol_max = be32_to_cpu(p->protocol_max);
4742 if (p->protocol_max == 0)
4743 p->protocol_max = p->protocol_min;
4745 if (PRO_VERSION_MAX < p->protocol_min ||
4746 PRO_VERSION_MIN > p->protocol_max)
4749 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4750 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4752 drbd_info(connection, "Handshake successful: "
4753 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4755 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4756 connection->agreed_features & FF_TRIM ? " " : " not ");
4761 drbd_err(connection, "incompatible DRBD dialects: "
4762 "I support %d-%d, peer supports %d-%d\n",
4763 PRO_VERSION_MIN, PRO_VERSION_MAX,
4764 p->protocol_min, p->protocol_max);
4768 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4769 static int drbd_do_auth(struct drbd_connection *connection)
4771 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4772 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4776 #define CHALLENGE_LEN 64
4780 0 - failed, try again (network error),
4781 -1 - auth failed, don't try again.
4784 static int drbd_do_auth(struct drbd_connection *connection)
4786 struct drbd_socket *sock;
4787 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4788 struct scatterlist sg;
4789 char *response = NULL;
4790 char *right_response = NULL;
4791 char *peers_ch = NULL;
4792 unsigned int key_len;
4793 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4794 unsigned int resp_size;
4795 struct hash_desc desc;
4796 struct packet_info pi;
4797 struct net_conf *nc;
4800 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4803 nc = rcu_dereference(connection->net_conf);
4804 key_len = strlen(nc->shared_secret);
4805 memcpy(secret, nc->shared_secret, key_len);
4808 desc.tfm = connection->cram_hmac_tfm;
4811 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4813 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4818 get_random_bytes(my_challenge, CHALLENGE_LEN);
4820 sock = &connection->data;
4821 if (!conn_prepare_command(connection, sock)) {
4825 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4826 my_challenge, CHALLENGE_LEN);
4830 err = drbd_recv_header(connection, &pi);
4836 if (pi.cmd != P_AUTH_CHALLENGE) {
4837 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4838 cmdname(pi.cmd), pi.cmd);
4843 if (pi.size > CHALLENGE_LEN * 2) {
4844 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4849 if (pi.size < CHALLENGE_LEN) {
4850 drbd_err(connection, "AuthChallenge payload too small.\n");
4855 peers_ch = kmalloc(pi.size, GFP_NOIO);
4856 if (peers_ch == NULL) {
4857 drbd_err(connection, "kmalloc of peers_ch failed\n");
4862 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4868 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4869 drbd_err(connection, "Peer presented the same challenge!\n");
4874 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4875 response = kmalloc(resp_size, GFP_NOIO);
4876 if (response == NULL) {
4877 drbd_err(connection, "kmalloc of response failed\n");
4882 sg_init_table(&sg, 1);
4883 sg_set_buf(&sg, peers_ch, pi.size);
4885 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4887 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4892 if (!conn_prepare_command(connection, sock)) {
4896 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4897 response, resp_size);
4901 err = drbd_recv_header(connection, &pi);
4907 if (pi.cmd != P_AUTH_RESPONSE) {
4908 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4909 cmdname(pi.cmd), pi.cmd);
4914 if (pi.size != resp_size) {
4915 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4920 err = drbd_recv_all_warn(connection, response , resp_size);
4926 right_response = kmalloc(resp_size, GFP_NOIO);
4927 if (right_response == NULL) {
4928 drbd_err(connection, "kmalloc of right_response failed\n");
4933 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4935 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4937 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4942 rv = !memcmp(response, right_response, resp_size);
4945 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4953 kfree(right_response);
4959 int drbd_receiver(struct drbd_thread *thi)
4961 struct drbd_connection *connection = thi->connection;
4964 drbd_info(connection, "receiver (re)started\n");
4967 h = conn_connect(connection);
4969 conn_disconnect(connection);
4970 schedule_timeout_interruptible(HZ);
4973 drbd_warn(connection, "Discarding network configuration.\n");
4974 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4981 conn_disconnect(connection);
4983 drbd_info(connection, "receiver terminated\n");
4987 /* ********* acknowledge sender ******** */
4989 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4991 struct p_req_state_reply *p = pi->data;
4992 int retcode = be32_to_cpu(p->retcode);
4994 if (retcode >= SS_SUCCESS) {
4995 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4997 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4998 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4999 drbd_set_st_err_str(retcode), retcode);
5001 wake_up(&connection->ping_wait);
5006 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5008 struct drbd_peer_device *peer_device;
5009 struct drbd_device *device;
5010 struct p_req_state_reply *p = pi->data;
5011 int retcode = be32_to_cpu(p->retcode);
5013 peer_device = conn_peer_device(connection, pi->vnr);
5016 device = peer_device->device;
5018 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5019 D_ASSERT(device, connection->agreed_pro_version < 100);
5020 return got_conn_RqSReply(connection, pi);
5023 if (retcode >= SS_SUCCESS) {
5024 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5026 set_bit(CL_ST_CHG_FAIL, &device->flags);
5027 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5028 drbd_set_st_err_str(retcode), retcode);
5030 wake_up(&device->state_wait);
5035 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5037 return drbd_send_ping_ack(connection);
5041 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5043 /* restore idle timeout */
5044 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5045 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5046 wake_up(&connection->ping_wait);
5051 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5053 struct drbd_peer_device *peer_device;
5054 struct drbd_device *device;
5055 struct p_block_ack *p = pi->data;
5056 sector_t sector = be64_to_cpu(p->sector);
5057 int blksize = be32_to_cpu(p->blksize);
5059 peer_device = conn_peer_device(connection, pi->vnr);
5062 device = peer_device->device;
5064 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5066 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5068 if (get_ldev(device)) {
5069 drbd_rs_complete_io(device, sector);
5070 drbd_set_in_sync(device, sector, blksize);
5071 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5072 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5075 dec_rs_pending(device);
5076 atomic_add(blksize >> 9, &device->rs_sect_in);
5082 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5083 struct rb_root *root, const char *func,
5084 enum drbd_req_event what, bool missing_ok)
5086 struct drbd_request *req;
5087 struct bio_and_error m;
5089 spin_lock_irq(&device->resource->req_lock);
5090 req = find_request(device, root, id, sector, missing_ok, func);
5091 if (unlikely(!req)) {
5092 spin_unlock_irq(&device->resource->req_lock);
5095 __req_mod(req, what, &m);
5096 spin_unlock_irq(&device->resource->req_lock);
5099 complete_master_bio(device, &m);
5103 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5105 struct drbd_peer_device *peer_device;
5106 struct drbd_device *device;
5107 struct p_block_ack *p = pi->data;
5108 sector_t sector = be64_to_cpu(p->sector);
5109 int blksize = be32_to_cpu(p->blksize);
5110 enum drbd_req_event what;
5112 peer_device = conn_peer_device(connection, pi->vnr);
5115 device = peer_device->device;
5117 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5119 if (p->block_id == ID_SYNCER) {
5120 drbd_set_in_sync(device, sector, blksize);
5121 dec_rs_pending(device);
5125 case P_RS_WRITE_ACK:
5126 what = WRITE_ACKED_BY_PEER_AND_SIS;
5129 what = WRITE_ACKED_BY_PEER;
5132 what = RECV_ACKED_BY_PEER;
5135 what = CONFLICT_RESOLVED;
5138 what = POSTPONE_WRITE;
5144 return validate_req_change_req_state(device, p->block_id, sector,
5145 &device->write_requests, __func__,
5149 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5151 struct drbd_peer_device *peer_device;
5152 struct drbd_device *device;
5153 struct p_block_ack *p = pi->data;
5154 sector_t sector = be64_to_cpu(p->sector);
5155 int size = be32_to_cpu(p->blksize);
5158 peer_device = conn_peer_device(connection, pi->vnr);
5161 device = peer_device->device;
5163 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5165 if (p->block_id == ID_SYNCER) {
5166 dec_rs_pending(device);
5167 drbd_rs_failed_io(device, sector, size);
5171 err = validate_req_change_req_state(device, p->block_id, sector,
5172 &device->write_requests, __func__,
5175 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5176 The master bio might already be completed, therefore the
5177 request is no longer in the collision hash. */
5178 /* In Protocol B we might already have got a P_RECV_ACK
5179 but then get a P_NEG_ACK afterwards. */
5180 drbd_set_out_of_sync(device, sector, size);
5185 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5187 struct drbd_peer_device *peer_device;
5188 struct drbd_device *device;
5189 struct p_block_ack *p = pi->data;
5190 sector_t sector = be64_to_cpu(p->sector);
5192 peer_device = conn_peer_device(connection, pi->vnr);
5195 device = peer_device->device;
5197 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5199 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5200 (unsigned long long)sector, be32_to_cpu(p->blksize));
5202 return validate_req_change_req_state(device, p->block_id, sector,
5203 &device->read_requests, __func__,
5207 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5209 struct drbd_peer_device *peer_device;
5210 struct drbd_device *device;
5213 struct p_block_ack *p = pi->data;
5215 peer_device = conn_peer_device(connection, pi->vnr);
5218 device = peer_device->device;
5220 sector = be64_to_cpu(p->sector);
5221 size = be32_to_cpu(p->blksize);
5223 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5225 dec_rs_pending(device);
5227 if (get_ldev_if_state(device, D_FAILED)) {
5228 drbd_rs_complete_io(device, sector);
5230 case P_NEG_RS_DREPLY:
5231 drbd_rs_failed_io(device, sector, size);
5243 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5245 struct p_barrier_ack *p = pi->data;
5246 struct drbd_peer_device *peer_device;
5249 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5252 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5253 struct drbd_device *device = peer_device->device;
5255 if (device->state.conn == C_AHEAD &&
5256 atomic_read(&device->ap_in_flight) == 0 &&
5257 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5258 device->start_resync_timer.expires = jiffies + HZ;
5259 add_timer(&device->start_resync_timer);
5267 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5269 struct drbd_peer_device *peer_device;
5270 struct drbd_device *device;
5271 struct p_block_ack *p = pi->data;
5272 struct drbd_device_work *dw;
5276 peer_device = conn_peer_device(connection, pi->vnr);
5279 device = peer_device->device;
5281 sector = be64_to_cpu(p->sector);
5282 size = be32_to_cpu(p->blksize);
5284 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5286 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5287 drbd_ov_out_of_sync_found(device, sector, size);
5289 ov_out_of_sync_print(device);
5291 if (!get_ldev(device))
5294 drbd_rs_complete_io(device, sector);
5295 dec_rs_pending(device);
5299 /* let's advance progress step marks only for every other megabyte */
5300 if ((device->ov_left & 0x200) == 0x200)
5301 drbd_advance_rs_marks(device, device->ov_left);
5303 if (device->ov_left == 0) {
5304 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5306 dw->w.cb = w_ov_finished;
5307 dw->device = device;
5308 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5310 drbd_err(device, "kmalloc(dw) failed.");
5311 ov_out_of_sync_print(device);
5312 drbd_resync_finished(device);
5319 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5324 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5326 struct drbd_peer_device *peer_device;
5327 int vnr, not_empty = 0;
5330 clear_bit(SIGNAL_ASENDER, &connection->flags);
5331 flush_signals(current);
5334 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5335 struct drbd_device *device = peer_device->device;
5336 kref_get(&device->kref);
5338 if (drbd_finish_peer_reqs(device)) {
5339 kref_put(&device->kref, drbd_destroy_device);
5342 kref_put(&device->kref, drbd_destroy_device);
5345 set_bit(SIGNAL_ASENDER, &connection->flags);
5347 spin_lock_irq(&connection->resource->req_lock);
5348 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5349 struct drbd_device *device = peer_device->device;
5350 not_empty = !list_empty(&device->done_ee);
5354 spin_unlock_irq(&connection->resource->req_lock);
5356 } while (not_empty);
5361 struct asender_cmd {
5363 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5366 static struct asender_cmd asender_tbl[] = {
5367 [P_PING] = { 0, got_Ping },
5368 [P_PING_ACK] = { 0, got_PingAck },
5369 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5370 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5371 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5372 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5373 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5374 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5375 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5376 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5377 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5378 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5379 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5380 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5381 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5382 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5383 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5386 int drbd_asender(struct drbd_thread *thi)
5388 struct drbd_connection *connection = thi->connection;
5389 struct asender_cmd *cmd = NULL;
5390 struct packet_info pi;
5392 void *buf = connection->meta.rbuf;
5394 unsigned int header_size = drbd_header_size(connection);
5395 int expect = header_size;
5396 bool ping_timeout_active = false;
5397 struct net_conf *nc;
5398 int ping_timeo, tcp_cork, ping_int;
5399 struct sched_param param = { .sched_priority = 2 };
5401 rv = sched_setscheduler(current, SCHED_RR, ¶m);
5403 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5405 while (get_t_state(thi) == RUNNING) {
5406 drbd_thread_current_set_cpu(thi);
5409 nc = rcu_dereference(connection->net_conf);
5410 ping_timeo = nc->ping_timeo;
5411 tcp_cork = nc->tcp_cork;
5412 ping_int = nc->ping_int;
5415 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5416 if (drbd_send_ping(connection)) {
5417 drbd_err(connection, "drbd_send_ping has failed\n");
5420 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5421 ping_timeout_active = true;
5424 /* TODO: conditionally cork; it may hurt latency if we cork without
5427 drbd_tcp_cork(connection->meta.socket);
5428 if (connection_finish_peer_reqs(connection)) {
5429 drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5432 /* but unconditionally uncork unless disabled */
5434 drbd_tcp_uncork(connection->meta.socket);
5436 /* short circuit, recv_msg would return EINTR anyways. */
5437 if (signal_pending(current))
5440 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5441 clear_bit(SIGNAL_ASENDER, &connection->flags);
5443 flush_signals(current);
5446 * -EINTR (on meta) we got a signal
5447 * -EAGAIN (on meta) rcvtimeo expired
5448 * -ECONNRESET other side closed the connection
5449 * -ERESTARTSYS (on data) we got a signal
5450 * rv < 0 other than above: unexpected error!
5451 * rv == expected: full header or command
5452 * rv < expected: "woken" by signal during receive
5453 * rv == 0 : "connection shut down by peer"
5455 if (likely(rv > 0)) {
5458 } else if (rv == 0) {
5459 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5462 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5465 t = wait_event_timeout(connection->ping_wait,
5466 connection->cstate < C_WF_REPORT_PARAMS,
5471 drbd_err(connection, "meta connection shut down by peer.\n");
5473 } else if (rv == -EAGAIN) {
5474 /* If the data socket received something meanwhile,
5475 * that is good enough: peer is still alive. */
5476 if (time_after(connection->last_received,
5477 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5479 if (ping_timeout_active) {
5480 drbd_err(connection, "PingAck did not arrive in time.\n");
5483 set_bit(SEND_PING, &connection->flags);
5485 } else if (rv == -EINTR) {
5488 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5492 if (received == expect && cmd == NULL) {
5493 if (decode_header(connection, connection->meta.rbuf, &pi))
5495 cmd = &asender_tbl[pi.cmd];
5496 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5497 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5498 cmdname(pi.cmd), pi.cmd);
5501 expect = header_size + cmd->pkt_size;
5502 if (pi.size != expect - header_size) {
5503 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5508 if (received == expect) {
5511 err = cmd->fn(connection, &pi);
5513 drbd_err(connection, "%pf failed\n", cmd->fn);
5517 connection->last_received = jiffies;
5519 if (cmd == &asender_tbl[P_PING_ACK]) {
5520 /* restore idle timeout */
5521 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5522 ping_timeout_active = false;
5525 buf = connection->meta.rbuf;
5527 expect = header_size;
5534 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5535 conn_md_sync(connection);
5539 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5541 clear_bit(SIGNAL_ASENDER, &connection->flags);
5543 drbd_info(connection, "asender terminated\n");