libceph: clean up con flags
[firefly-linux-kernel-4.4.55.git] / net / ceph / messenger.c
index 20e60a80bc298d1b40ac969ff09c88720c65b786..b872db5c498941020c05c6c77a9729b70cace120 100644 (file)
 #define CON_SOCK_STATE_CONNECTED       3       /* -> CLOSING or -> CLOSED */
 #define CON_SOCK_STATE_CLOSING         4       /* -> CLOSED */
 
+/*
+ * connection states
+ */
+#define CON_STATE_CLOSED        1  /* -> PREOPEN */
+#define CON_STATE_PREOPEN       2  /* -> CONNECTING, CLOSED */
+#define CON_STATE_CONNECTING    3  /* -> NEGOTIATING, CLOSED */
+#define CON_STATE_NEGOTIATING   4  /* -> OPEN, CLOSED */
+#define CON_STATE_OPEN          5  /* -> STANDBY, CLOSED */
+#define CON_STATE_STANDBY       6  /* -> PREOPEN, CLOSED */
+
+/*
+ * ceph_connection flag bits
+ */
+#define CON_FLAG_LOSSYTX           0  /* we can close channel or drop
+                                      * messages on errors */
+#define CON_FLAG_KEEPALIVE_PENDING 1  /* we need to send a keepalive */
+#define CON_FLAG_WRITE_PENDING    2  /* we have data ready to send */
+#define CON_FLAG_SOCK_CLOSED      3  /* socket state changed to closed */
+#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
+
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -277,7 +297,7 @@ static void ceph_sock_write_space(struct sock *sk)
         * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
         * and net/core/stream.c:sk_stream_write_space().
         */
-       if (test_bit(WRITE_PENDING, &con->flags)) {
+       if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
                if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
                        dout("%s %p queueing write work\n", __func__, con);
                        clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -296,16 +316,13 @@ static void ceph_sock_state_change(struct sock *sk)
        dout("%s %p state = %lu sk_state = %u\n", __func__,
             con, con->state, sk->sk_state);
 
-       if (test_bit(CLOSED, &con->state))
-               return;
-
        switch (sk->sk_state) {
        case TCP_CLOSE:
                dout("%s TCP_CLOSE\n", __func__);
        case TCP_CLOSE_WAIT:
                dout("%s TCP_CLOSE_WAIT\n", __func__);
                con_sock_state_closing(con);
-               set_bit(SOCK_CLOSED, &con->flags);
+               set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
                queue_con(con);
                break;
        case TCP_ESTABLISHED:
@@ -441,12 +458,12 @@ static int con_close_socket(struct ceph_connection *con)
        con->sock = NULL;
 
        /*
-        * Forcibly clear the SOCK_CLOSE flag.  It gets set
+        * Forcibly clear the SOCK_CLOSED flag.  It gets set
         * independent of the connection mutex, and we could have
         * received a socket close event before we had the chance to
         * shut the socket down.
         */
-       clear_bit(SOCK_CLOSED, &con->flags);
+       clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
        con_sock_state_closed(con);
        return rc;
 }
@@ -506,27 +523,17 @@ void ceph_con_close(struct ceph_connection *con)
        mutex_lock(&con->mutex);
        dout("con_close %p peer %s\n", con,
             ceph_pr_addr(&con->peer_addr.in_addr));
-       clear_bit(NEGOTIATING, &con->state);
-       clear_bit(CONNECTING, &con->state);
-       clear_bit(CONNECTED, &con->state);
-       clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
-       set_bit(CLOSED, &con->state);
+       con->state = CON_STATE_CLOSED;
 
-       clear_bit(LOSSYTX, &con->flags);  /* so we retry next connect */
-       clear_bit(KEEPALIVE_PENDING, &con->flags);
-       clear_bit(WRITE_PENDING, &con->flags);
+       clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
+       clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+       clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 
        reset_connection(con);
        con->peer_global_seq = 0;
        cancel_delayed_work(&con->work);
+       con_close_socket(con);
        mutex_unlock(&con->mutex);
-
-       /*
-        * We cannot close the socket directly from here because the
-        * work threads use it without holding the mutex.  Instead, let
-        * con_work() do it.
-        */
-       queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_close);
 
@@ -539,8 +546,9 @@ void ceph_con_open(struct ceph_connection *con,
 {
        mutex_lock(&con->mutex);
        dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
-       set_bit(OPENING, &con->state);
-       WARN_ON(!test_and_clear_bit(CLOSED, &con->state));
+
+       BUG_ON(con->state != CON_STATE_CLOSED);
+       con->state = CON_STATE_PREOPEN;
 
        con->peer_name.type = (__u8) entity_type;
        con->peer_name.num = cpu_to_le64(entity_num);
@@ -580,7 +588,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, con_work);
 
-       set_bit(CLOSED, &con->state);
+       con->state = CON_STATE_CLOSED;
 }
 EXPORT_SYMBOL(ceph_con_init);
 
@@ -771,7 +779,7 @@ static void prepare_write_message(struct ceph_connection *con)
                /* no, queue up footer too and be done */
                prepare_write_message_footer(con);
 
-       set_bit(WRITE_PENDING, &con->flags);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -792,7 +800,7 @@ static void prepare_write_ack(struct ceph_connection *con)
                                &con->out_temp_ack);
 
        con->out_more = 1;  /* more will follow.. eventually.. */
-       set_bit(WRITE_PENDING, &con->flags);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -803,7 +811,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
        dout("prepare_write_keepalive %p\n", con);
        con_out_kvec_reset(con);
        con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
-       set_bit(WRITE_PENDING, &con->flags);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -818,27 +826,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
        if (!con->ops->get_authorizer) {
                con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
                con->out_connect.authorizer_len = 0;
-
                return NULL;
        }
 
        /* Can't hold the mutex while getting authorizer */
-
        mutex_unlock(&con->mutex);
-
        auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
-
        mutex_lock(&con->mutex);
 
        if (IS_ERR(auth))
                return auth;
-       if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
+       if (con->state != CON_STATE_NEGOTIATING)
                return ERR_PTR(-EAGAIN);
 
        con->auth_reply_buf = auth->authorizer_reply_buf;
        con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
-
-
        return auth;
 }
 
@@ -852,7 +854,7 @@ static void prepare_write_banner(struct ceph_connection *con)
                                        &con->msgr->my_enc_addr);
 
        con->out_more = 0;
-       set_bit(WRITE_PENDING, &con->flags);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 static int prepare_write_connect(struct ceph_connection *con)
@@ -903,7 +905,7 @@ static int prepare_write_connect(struct ceph_connection *con)
                                        auth->authorizer_buf);
 
        con->out_more = 0;
-       set_bit(WRITE_PENDING, &con->flags);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 
        return 0;
 }
@@ -1493,7 +1495,8 @@ static int process_banner(struct ceph_connection *con)
 static void fail_protocol(struct ceph_connection *con)
 {
        reset_connection(con);
-       set_bit(CLOSED, &con->state);  /* in case there's queued work */
+       BUG_ON(con->state != CON_STATE_NEGOTIATING);
+       con->state = CON_STATE_CLOSED;
 }
 
 static int process_connect(struct ceph_connection *con)
@@ -1567,8 +1570,7 @@ static int process_connect(struct ceph_connection *con)
                if (con->ops->peer_reset)
                        con->ops->peer_reset(con);
                mutex_lock(&con->mutex);
-               if (test_bit(CLOSED, &con->state) ||
-                   test_bit(OPENING, &con->state))
+               if (con->state != CON_STATE_NEGOTIATING)
                        return -EAGAIN;
                break;
 
@@ -1614,8 +1616,10 @@ static int process_connect(struct ceph_connection *con)
                        fail_protocol(con);
                        return -1;
                }
-               clear_bit(NEGOTIATING, &con->state);
-               set_bit(CONNECTED, &con->state);
+
+               BUG_ON(con->state != CON_STATE_NEGOTIATING);
+               con->state = CON_STATE_OPEN;
+
                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
                con->connect_seq++;
                con->peer_features = server_feat;
@@ -1627,7 +1631,7 @@ static int process_connect(struct ceph_connection *con)
                        le32_to_cpu(con->in_reply.connect_seq));
 
                if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
-                       set_bit(LOSSYTX, &con->flags);
+                       set_bit(CON_FLAG_LOSSYTX, &con->flags);
 
                con->delay = 0;      /* reset backoff memory */
 
@@ -2003,8 +2007,9 @@ more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
        /* open the socket first? */
-       if (con->sock == NULL) {
-               set_bit(CONNECTING, &con->state);
+       if (con->state == CON_STATE_PREOPEN) {
+               BUG_ON(con->sock);
+               con->state = CON_STATE_CONNECTING;
 
                con_out_kvec_reset(con);
                prepare_write_banner(con);
@@ -2055,8 +2060,7 @@ more_kvec:
        }
 
 do_next:
-       if (!test_bit(CONNECTING, &con->state) &&
-                       !test_bit(NEGOTIATING, &con->state)) {
+       if (con->state == CON_STATE_OPEN) {
                /* is anything else pending? */
                if (!list_empty(&con->out_queue)) {
                        prepare_write_message(con);
@@ -2066,14 +2070,15 @@ do_next:
                        prepare_write_ack(con);
                        goto more;
                }
-               if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) {
+               if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
+                                      &con->flags)) {
                        prepare_write_keepalive(con);
                        goto more;
                }
        }
 
        /* Nothing to do! */
-       clear_bit(WRITE_PENDING, &con->flags);
+       clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
        dout("try_write nothing else to write.\n");
        ret = 0;
 out:
@@ -2090,29 +2095,19 @@ static int try_read(struct ceph_connection *con)
 {
        int ret = -1;
 
-       if (!con->sock)
-               return 0;
-
-       if (test_bit(STANDBY, &con->state))
+more:
+       dout("try_read start on %p state %lu\n", con, con->state);
+       if (con->state != CON_STATE_CONNECTING &&
+           con->state != CON_STATE_NEGOTIATING &&
+           con->state != CON_STATE_OPEN)
                return 0;
 
-       dout("try_read start on %p\n", con);
+       BUG_ON(!con->sock);
 
-more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
             con->in_base_pos);
 
-       /*
-        * process_connect and process_message drop and re-take
-        * con->mutex.  make sure we handle a racing close or reopen.
-        */
-       if (test_bit(CLOSED, &con->state) ||
-           test_bit(OPENING, &con->state)) {
-               ret = -EAGAIN;
-               goto out;
-       }
-
-       if (test_bit(CONNECTING, &con->state)) {
+       if (con->state == CON_STATE_CONNECTING) {
                dout("try_read connecting\n");
                ret = read_partial_banner(con);
                if (ret <= 0)
@@ -2121,8 +2116,8 @@ more:
                if (ret < 0)
                        goto out;
 
-               clear_bit(CONNECTING, &con->state);
-               set_bit(NEGOTIATING, &con->state);
+               BUG_ON(con->state != CON_STATE_CONNECTING);
+               con->state = CON_STATE_NEGOTIATING;
 
                /* Banner is good, exchange connection info */
                ret = prepare_write_connect(con);
@@ -2134,7 +2129,7 @@ more:
                goto out;
        }
 
-       if (test_bit(NEGOTIATING, &con->state)) {
+       if (con->state == CON_STATE_NEGOTIATING) {
                dout("try_read negotiating\n");
                ret = read_partial_connect(con);
                if (ret <= 0)
@@ -2145,6 +2140,8 @@ more:
                goto more;
        }
 
+       BUG_ON(con->state != CON_STATE_OPEN);
+
        if (con->in_base_pos < 0) {
                /*
                 * skipping + discarding content.
@@ -2178,8 +2175,8 @@ more:
                        prepare_read_ack(con);
                        break;
                case CEPH_MSGR_TAG_CLOSE:
-                       clear_bit(CONNECTED, &con->state);
-                       set_bit(CLOSED, &con->state);   /* fixme */
+                       con_close_socket(con);
+                       con->state = CON_STATE_CLOSED;
                        goto out;
                default:
                        goto bad_tag;
@@ -2254,19 +2251,26 @@ static void con_work(struct work_struct *work)
 
        mutex_lock(&con->mutex);
 restart:
-       if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
-               if (test_and_clear_bit(CONNECTED, &con->state))
-                       con->error_msg = "socket closed";
-               else if (test_and_clear_bit(NEGOTIATING, &con->state))
-                       con->error_msg = "negotiation failed";
-               else if (test_and_clear_bit(CONNECTING, &con->state))
+       if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
+               switch (con->state) {
+               case CON_STATE_CONNECTING:
                        con->error_msg = "connection failed";
-               else
+                       break;
+               case CON_STATE_NEGOTIATING:
+                       con->error_msg = "negotiation failed";
+                       break;
+               case CON_STATE_OPEN:
+                       con->error_msg = "socket closed";
+                       break;
+               default:
+                       dout("unrecognized con state %d\n", (int)con->state);
                        con->error_msg = "unrecognized con state";
+                       BUG();
+               }
                goto fault;
        }
 
-       if (test_and_clear_bit(BACKOFF, &con->flags)) {
+       if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
                dout("con_work %p backing off\n", con);
                if (queue_delayed_work(ceph_msgr_wq, &con->work,
                                       round_jiffies_relative(con->delay))) {
@@ -2280,19 +2284,18 @@ restart:
                }
        }
 
-       if (test_bit(STANDBY, &con->state)) {
+       if (con->state == CON_STATE_STANDBY) {
                dout("con_work %p STANDBY\n", con);
                goto done;
        }
-       if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
-               dout("con_work CLOSED\n");
-               con_close_socket(con);
+       if (con->state == CON_STATE_CLOSED) {
+               dout("con_work %p CLOSED\n", con);
+               BUG_ON(con->sock);
                goto done;
        }
-       if (test_and_clear_bit(OPENING, &con->state)) {
-               /* reopen w/ new peer */
+       if (con->state == CON_STATE_PREOPEN) {
                dout("con_work OPENING\n");
-               con_close_socket(con);
+               BUG_ON(con->sock);
        }
 
        ret = try_read(con);
@@ -2337,13 +2340,15 @@ static void ceph_fault(struct ceph_connection *con)
        dout("fault %p state %lu to peer %s\n",
             con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
 
-       if (test_bit(CLOSED, &con->state))
-               goto out_unlock;
+       BUG_ON(con->state != CON_STATE_CONNECTING &&
+              con->state != CON_STATE_NEGOTIATING &&
+              con->state != CON_STATE_OPEN);
 
        con_close_socket(con);
 
-       if (test_bit(LOSSYTX, &con->flags)) {
-               dout("fault on LOSSYTX channel\n");
+       if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
+               dout("fault on LOSSYTX channel, marking CLOSED\n");
+               con->state = CON_STATE_CLOSED;
                goto out_unlock;
        }
 
@@ -2361,12 +2366,13 @@ static void ceph_fault(struct ceph_connection *con)
        /* If there are no messages queued or keepalive pending, place
         * the connection in a STANDBY state */
        if (list_empty(&con->out_queue) &&
-           !test_bit(KEEPALIVE_PENDING, &con->flags)) {
+           !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
                dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
-               clear_bit(WRITE_PENDING, &con->flags);
-               set_bit(STANDBY, &con->state);
+               clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+               con->state = CON_STATE_STANDBY;
        } else {
                /* retry after a delay. */
+               con->state = CON_STATE_PREOPEN;
                if (con->delay == 0)
                        con->delay = BASE_DELAY_INTERVAL;
                else if (con->delay < MAX_DELAY_INTERVAL)
@@ -2387,7 +2393,7 @@ static void ceph_fault(struct ceph_connection *con)
                         * that when con_work restarts we schedule the
                         * delay then.
                         */
-                       set_bit(BACKOFF, &con->flags);
+                       set_bit(CON_FLAG_BACKOFF, &con->flags);
                }
        }
 
@@ -2440,11 +2446,12 @@ EXPORT_SYMBOL(ceph_messenger_init);
 static void clear_standby(struct ceph_connection *con)
 {
        /* come back from STANDBY? */
-       if (test_and_clear_bit(STANDBY, &con->state)) {
+       if (con->state == CON_STATE_STANDBY) {
                dout("clear_standby %p and ++connect_seq\n", con);
+               con->state = CON_STATE_PREOPEN;
                con->connect_seq++;
-               WARN_ON(test_bit(WRITE_PENDING, &con->flags));
-               WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
+               WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
+               WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
        }
 }
 
@@ -2460,7 +2467,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 
        mutex_lock(&con->mutex);
 
-       if (test_bit(CLOSED, &con->state)) {
+       if (con->state == CON_STATE_CLOSED) {
                dout("con_send %p closed, dropping %p\n", con, msg);
                ceph_msg_put(msg);
                mutex_unlock(&con->mutex);
@@ -2485,7 +2492,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 
        /* if there wasn't anything waiting to send before, queue
         * new work */
-       if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
+       if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
                queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_send);
@@ -2574,8 +2581,8 @@ void ceph_con_keepalive(struct ceph_connection *con)
        mutex_lock(&con->mutex);
        clear_standby(con);
        mutex_unlock(&con->mutex);
-       if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 &&
-           test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
+       if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
+           test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
                queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_keepalive);