#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
+/*
+ * connection states
+ */
+#define CON_STATE_CLOSED 1 /* -> PREOPEN */
+#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
+#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
+#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
+#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
+#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */
+
+/*
+ * ceph_connection flag bits
+ */
+#define CON_FLAG_LOSSYTX 0 /* we can close channel or drop
+ * messages on errors */
+#define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */
+#define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */
+#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */
+#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */
+
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
* buffer. See net/ipv4/tcp_input.c:tcp_check_space()
* and net/core/stream.c:sk_stream_write_space().
*/
- if (test_bit(WRITE_PENDING, &con->flags)) {
+ if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
dout("%s %p queueing write work\n", __func__, con);
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
dout("%s %p state = %lu sk_state = %u\n", __func__,
con, con->state, sk->sk_state);
- if (test_bit(CLOSED, &con->state))
- return;
-
switch (sk->sk_state) {
case TCP_CLOSE:
dout("%s TCP_CLOSE\n", __func__);
case TCP_CLOSE_WAIT:
dout("%s TCP_CLOSE_WAIT\n", __func__);
con_sock_state_closing(con);
- set_bit(SOCK_CLOSED, &con->flags);
+ set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
queue_con(con);
break;
case TCP_ESTABLISHED:
con->sock = NULL;
/*
- * Forcibly clear the SOCK_CLOSE flag. It gets set
+ * Forcibly clear the SOCK_CLOSED flag. It gets set
* independent of the connection mutex, and we could have
* received a socket close event before we had the chance to
* shut the socket down.
*/
- clear_bit(SOCK_CLOSED, &con->flags);
+ clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
con_sock_state_closed(con);
return rc;
}
mutex_lock(&con->mutex);
dout("con_close %p peer %s\n", con,
ceph_pr_addr(&con->peer_addr.in_addr));
- clear_bit(NEGOTIATING, &con->state);
- clear_bit(CONNECTING, &con->state);
- clear_bit(CONNECTED, &con->state);
- clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
- set_bit(CLOSED, &con->state);
+ con->state = CON_STATE_CLOSED;
- clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */
- clear_bit(KEEPALIVE_PENDING, &con->flags);
- clear_bit(WRITE_PENDING, &con->flags);
+ clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
+ clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+ clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
reset_connection(con);
con->peer_global_seq = 0;
cancel_delayed_work(&con->work);
+ con_close_socket(con);
mutex_unlock(&con->mutex);
-
- /*
- * We cannot close the socket directly from here because the
- * work threads use it without holding the mutex. Instead, let
- * con_work() do it.
- */
- queue_con(con);
}
EXPORT_SYMBOL(ceph_con_close);
{
mutex_lock(&con->mutex);
dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
- set_bit(OPENING, &con->state);
- WARN_ON(!test_and_clear_bit(CLOSED, &con->state));
+
+ BUG_ON(con->state != CON_STATE_CLOSED);
+ con->state = CON_STATE_PREOPEN;
con->peer_name.type = (__u8) entity_type;
con->peer_name.num = cpu_to_le64(entity_num);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);
- set_bit(CLOSED, &con->state);
+ con->state = CON_STATE_CLOSED;
}
EXPORT_SYMBOL(ceph_con_init);
/* no, queue up footer too and be done */
prepare_write_message_footer(con);
- set_bit(WRITE_PENDING, &con->flags);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
/*
&con->out_temp_ack);
con->out_more = 1; /* more will follow.. eventually.. */
- set_bit(WRITE_PENDING, &con->flags);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
/*
dout("prepare_write_keepalive %p\n", con);
con_out_kvec_reset(con);
con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
- set_bit(WRITE_PENDING, &con->flags);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
/*
if (!con->ops->get_authorizer) {
con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
con->out_connect.authorizer_len = 0;
-
return NULL;
}
/* Can't hold the mutex while getting authorizer */
-
mutex_unlock(&con->mutex);
-
auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
-
mutex_lock(&con->mutex);
if (IS_ERR(auth))
return auth;
- if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
+ if (con->state != CON_STATE_NEGOTIATING)
return ERR_PTR(-EAGAIN);
con->auth_reply_buf = auth->authorizer_reply_buf;
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
-
-
return auth;
}
&con->msgr->my_enc_addr);
con->out_more = 0;
- set_bit(WRITE_PENDING, &con->flags);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
static int prepare_write_connect(struct ceph_connection *con)
auth->authorizer_buf);
con->out_more = 0;
- set_bit(WRITE_PENDING, &con->flags);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
return 0;
}
static void fail_protocol(struct ceph_connection *con)
{
reset_connection(con);
- set_bit(CLOSED, &con->state); /* in case there's queued work */
+ BUG_ON(con->state != CON_STATE_NEGOTIATING);
+ con->state = CON_STATE_CLOSED;
}
static int process_connect(struct ceph_connection *con)
if (con->ops->peer_reset)
con->ops->peer_reset(con);
mutex_lock(&con->mutex);
- if (test_bit(CLOSED, &con->state) ||
- test_bit(OPENING, &con->state))
+ if (con->state != CON_STATE_NEGOTIATING)
return -EAGAIN;
break;
fail_protocol(con);
return -1;
}
- clear_bit(NEGOTIATING, &con->state);
- set_bit(CONNECTED, &con->state);
+
+ BUG_ON(con->state != CON_STATE_NEGOTIATING);
+ con->state = CON_STATE_OPEN;
+
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->peer_features = server_feat;
le32_to_cpu(con->in_reply.connect_seq));
if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
- set_bit(LOSSYTX, &con->flags);
+ set_bit(CON_FLAG_LOSSYTX, &con->flags);
con->delay = 0; /* reset backoff memory */
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
/* open the socket first? */
- if (con->sock == NULL) {
- set_bit(CONNECTING, &con->state);
+ if (con->state == CON_STATE_PREOPEN) {
+ BUG_ON(con->sock);
+ con->state = CON_STATE_CONNECTING;
con_out_kvec_reset(con);
prepare_write_banner(con);
}
do_next:
- if (!test_bit(CONNECTING, &con->state) &&
- !test_bit(NEGOTIATING, &con->state)) {
+ if (con->state == CON_STATE_OPEN) {
/* is anything else pending? */
if (!list_empty(&con->out_queue)) {
prepare_write_message(con);
prepare_write_ack(con);
goto more;
}
- if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) {
+ if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
+ &con->flags)) {
prepare_write_keepalive(con);
goto more;
}
}
/* Nothing to do! */
- clear_bit(WRITE_PENDING, &con->flags);
+ clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
dout("try_write nothing else to write.\n");
ret = 0;
out:
{
int ret = -1;
- if (!con->sock)
- return 0;
-
- if (test_bit(STANDBY, &con->state))
+more:
+ dout("try_read start on %p state %lu\n", con, con->state);
+ if (con->state != CON_STATE_CONNECTING &&
+ con->state != CON_STATE_NEGOTIATING &&
+ con->state != CON_STATE_OPEN)
return 0;
- dout("try_read start on %p\n", con);
+ BUG_ON(!con->sock);
-more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);
- /*
- * process_connect and process_message drop and re-take
- * con->mutex. make sure we handle a racing close or reopen.
- */
- if (test_bit(CLOSED, &con->state) ||
- test_bit(OPENING, &con->state)) {
- ret = -EAGAIN;
- goto out;
- }
-
- if (test_bit(CONNECTING, &con->state)) {
+ if (con->state == CON_STATE_CONNECTING) {
dout("try_read connecting\n");
ret = read_partial_banner(con);
if (ret <= 0)
if (ret < 0)
goto out;
- clear_bit(CONNECTING, &con->state);
- set_bit(NEGOTIATING, &con->state);
+ BUG_ON(con->state != CON_STATE_CONNECTING);
+ con->state = CON_STATE_NEGOTIATING;
/* Banner is good, exchange connection info */
ret = prepare_write_connect(con);
goto out;
}
- if (test_bit(NEGOTIATING, &con->state)) {
+ if (con->state == CON_STATE_NEGOTIATING) {
dout("try_read negotiating\n");
ret = read_partial_connect(con);
if (ret <= 0)
goto more;
}
+ BUG_ON(con->state != CON_STATE_OPEN);
+
if (con->in_base_pos < 0) {
/*
* skipping + discarding content.
prepare_read_ack(con);
break;
case CEPH_MSGR_TAG_CLOSE:
- clear_bit(CONNECTED, &con->state);
- set_bit(CLOSED, &con->state); /* fixme */
+ con_close_socket(con);
+ con->state = CON_STATE_CLOSED;
goto out;
default:
goto bad_tag;
mutex_lock(&con->mutex);
restart:
- if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
- if (test_and_clear_bit(CONNECTED, &con->state))
- con->error_msg = "socket closed";
- else if (test_and_clear_bit(NEGOTIATING, &con->state))
- con->error_msg = "negotiation failed";
- else if (test_and_clear_bit(CONNECTING, &con->state))
+ if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
+ switch (con->state) {
+ case CON_STATE_CONNECTING:
con->error_msg = "connection failed";
- else
+ break;
+ case CON_STATE_NEGOTIATING:
+ con->error_msg = "negotiation failed";
+ break;
+ case CON_STATE_OPEN:
+ con->error_msg = "socket closed";
+ break;
+ default:
+ dout("unrecognized con state %d\n", (int)con->state);
con->error_msg = "unrecognized con state";
+ BUG();
+ }
goto fault;
}
- if (test_and_clear_bit(BACKOFF, &con->flags)) {
+ if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
dout("con_work %p backing off\n", con);
if (queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay))) {
}
}
- if (test_bit(STANDBY, &con->state)) {
+ if (con->state == CON_STATE_STANDBY) {
dout("con_work %p STANDBY\n", con);
goto done;
}
- if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
- dout("con_work CLOSED\n");
- con_close_socket(con);
+ if (con->state == CON_STATE_CLOSED) {
+ dout("con_work %p CLOSED\n", con);
+ BUG_ON(con->sock);
goto done;
}
- if (test_and_clear_bit(OPENING, &con->state)) {
- /* reopen w/ new peer */
+ if (con->state == CON_STATE_PREOPEN) {
dout("con_work OPENING\n");
- con_close_socket(con);
+ BUG_ON(con->sock);
}
ret = try_read(con);
dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
- if (test_bit(CLOSED, &con->state))
- goto out_unlock;
+ BUG_ON(con->state != CON_STATE_CONNECTING &&
+ con->state != CON_STATE_NEGOTIATING &&
+ con->state != CON_STATE_OPEN);
con_close_socket(con);
- if (test_bit(LOSSYTX, &con->flags)) {
- dout("fault on LOSSYTX channel\n");
+ if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
+ dout("fault on LOSSYTX channel, marking CLOSED\n");
+ con->state = CON_STATE_CLOSED;
goto out_unlock;
}
/* If there are no messages queued or keepalive pending, place
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
- !test_bit(KEEPALIVE_PENDING, &con->flags)) {
+ !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
- clear_bit(WRITE_PENDING, &con->flags);
- set_bit(STANDBY, &con->state);
+ clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con->state = CON_STATE_STANDBY;
} else {
/* retry after a delay. */
+ con->state = CON_STATE_PREOPEN;
if (con->delay == 0)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
* that when con_work restarts we schedule the
* delay then.
*/
- set_bit(BACKOFF, &con->flags);
+ set_bit(CON_FLAG_BACKOFF, &con->flags);
}
}
static void clear_standby(struct ceph_connection *con)
{
/* come back from STANDBY? */
- if (test_and_clear_bit(STANDBY, &con->state)) {
+ if (con->state == CON_STATE_STANDBY) {
dout("clear_standby %p and ++connect_seq\n", con);
+ con->state = CON_STATE_PREOPEN;
con->connect_seq++;
- WARN_ON(test_bit(WRITE_PENDING, &con->flags));
- WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
+ WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
+ WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
}
}
mutex_lock(&con->mutex);
- if (test_bit(CLOSED, &con->state)) {
+ if (con->state == CON_STATE_CLOSED) {
dout("con_send %p closed, dropping %p\n", con, msg);
ceph_msg_put(msg);
mutex_unlock(&con->mutex);
/* if there wasn't anything waiting to send before, queue
* new work */
- if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
+ if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_send);
mutex_lock(&con->mutex);
clear_standby(con);
mutex_unlock(&con->mutex);
- if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 &&
- test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
+ if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
+ test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_keepalive);