ceph: pre-allocate ceph_cap struct for ceph_add_cap()
authorYan, Zheng <zheng.z.yan@intel.com>
Fri, 18 Apr 2014 01:57:11 +0000 (09:57 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Fri, 6 Jun 2014 01:29:53 +0000 (09:29 +0800)
So that ceph_add_cap() can be used while i_ceph_lock is locked.
This simplifies the code that handle cap import/export.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/super.h

index 5f6d24ede79490bd9f9d6c7fb25ae764162a7017..73a42f5043570649b255ccfaed4eeaadb89d4572 100644 (file)
@@ -221,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
        return 0;
 }
 
-static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
-                               struct ceph_cap_reservation *ctx)
+struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
+                             struct ceph_cap_reservation *ctx)
 {
        struct ceph_cap *cap = NULL;
 
@@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
  * it is < 0.  (This is so we can atomically add the cap and add an
  * open file reference to it.)
  */
-int ceph_add_cap(struct inode *inode,
-                struct ceph_mds_session *session, u64 cap_id,
-                int fmode, unsigned issued, unsigned wanted,
-                unsigned seq, unsigned mseq, u64 realmino, int flags,
-                struct ceph_cap_reservation *caps_reservation)
+void ceph_add_cap(struct inode *inode,
+                 struct ceph_mds_session *session, u64 cap_id,
+                 int fmode, unsigned issued, unsigned wanted,
+                 unsigned seq, unsigned mseq, u64 realmino, int flags,
+                 struct ceph_cap **new_cap)
 {
        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_cap *new_cap = NULL;
        struct ceph_cap *cap;
        int mds = session->s_mds;
        int actual_wanted;
@@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode,
        if (fmode >= 0)
                wanted |= ceph_caps_for_mode(fmode);
 
-retry:
-       spin_lock(&ci->i_ceph_lock);
        cap = __get_cap_for_mds(ci, mds);
        if (!cap) {
-               if (new_cap) {
-                       cap = new_cap;
-                       new_cap = NULL;
-               } else {
-                       spin_unlock(&ci->i_ceph_lock);
-                       new_cap = get_cap(mdsc, caps_reservation);
-                       if (new_cap == NULL)
-                               return -ENOMEM;
-                       goto retry;
-               }
+               cap = *new_cap;
+               *new_cap = NULL;
 
                cap->issued = 0;
                cap->implemented = 0;
@@ -562,9 +551,6 @@ retry:
                session->s_nr_caps++;
                spin_unlock(&session->s_cap_lock);
        } else {
-               if (new_cap)
-                       ceph_put_cap(mdsc, new_cap);
-
                /*
                 * auth mds of the inode changed. we received the cap export
                 * message, but still haven't received the cap import message.
@@ -626,7 +612,6 @@ retry:
                        ci->i_auth_cap = cap;
                        cap->mds_wanted = wanted;
                }
-               ci->i_cap_exporting_issued = 0;
        } else {
                WARN_ON(ci->i_auth_cap == cap);
        }
@@ -648,9 +633,6 @@ retry:
 
        if (fmode >= 0)
                __ceph_get_fmode(ci, fmode);
-       spin_unlock(&ci->i_ceph_lock);
-       wake_up_all(&ci->i_cap_wq);
-       return 0;
 }
 
 /*
@@ -685,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
  */
 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
 {
-       int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
+       int have = ci->i_snap_caps;
        struct ceph_cap *cap;
        struct rb_node *p;
 
@@ -900,7 +882,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
  */
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
-       return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
+       return !RB_EMPTY_ROOT(&ci->i_caps);
 }
 
 int ceph_is_any_caps(struct inode *inode)
@@ -2796,7 +2778,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
 {
        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct ceph_mds_session *tsession = NULL;
-       struct ceph_cap *cap, *tcap;
+       struct ceph_cap *cap, *tcap, *new_cap = NULL;
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 t_cap_id;
        unsigned mseq = le32_to_cpu(ex->migrate_seq);
@@ -2858,15 +2840,14 @@ retry:
                }
                __ceph_remove_cap(cap, false);
                goto out_unlock;
-       }
-
-       if (tsession) {
-               int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
-               spin_unlock(&ci->i_ceph_lock);
+       } else if (tsession) {
                /* add placeholder for the export tagert */
+               int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
                ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
-                            t_seq - 1, t_mseq, (u64)-1, flag, NULL);
-               goto retry;
+                            t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
+
+               __ceph_remove_cap(cap, false);
+               goto out_unlock;
        }
 
        spin_unlock(&ci->i_ceph_lock);
@@ -2885,6 +2866,7 @@ retry:
                                          SINGLE_DEPTH_NESTING);
                }
                ceph_add_cap_releases(mdsc, tsession);
+               new_cap = ceph_get_cap(mdsc, NULL);
        } else {
                WARN_ON(1);
                tsession = NULL;
@@ -2899,6 +2881,8 @@ out_unlock:
                mutex_unlock(&tsession->s_mutex);
                ceph_put_mds_session(tsession);
        }
+       if (new_cap)
+               ceph_put_cap(mdsc, new_cap);
 }
 
 /*
@@ -2914,7 +2898,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
                              void *snaptrace, int snaptrace_len)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_cap *cap;
+       struct ceph_cap *cap, *new_cap = NULL;
        int mds = session->s_mds;
        unsigned issued = le32_to_cpu(im->caps);
        unsigned wanted = le32_to_cpu(im->wanted);
@@ -2936,7 +2920,20 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
        dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
             inode, ci, mds, mseq, peer);
 
+retry:
        spin_lock(&ci->i_ceph_lock);
+       cap = __get_cap_for_mds(ci, mds);
+       if (!cap) {
+               if (!new_cap) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       new_cap = ceph_get_cap(mdsc, NULL);
+                       goto retry;
+               }
+       }
+
+       ceph_add_cap(inode, session, cap_id, -1, issued, wanted, seq, mseq,
+                    realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
+
        cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
        if (cap && cap->cap_id == p_cap_id) {
                dout(" remove export cap %p mds%d flags %d\n",
@@ -2951,7 +2948,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
                               cap->mseq, mds, le32_to_cpu(ph->seq),
                               le32_to_cpu(ph->mseq));
                }
-               ci->i_cap_exporting_issued = cap->issued;
                __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
        }
 
@@ -2960,16 +2956,17 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
        ci->i_requested_max_size = 0;
        spin_unlock(&ci->i_ceph_lock);
 
+       wake_up_all(&ci->i_cap_wq);
+
        down_write(&mdsc->snap_rwsem);
        ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
                               false);
        downgrade_write(&mdsc->snap_rwsem);
-       ceph_add_cap(inode, session, cap_id, -1,
-                    issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
-                    NULL /* no caps context */);
        kick_flushing_inode_caps(mdsc, session, inode);
        up_read(&mdsc->snap_rwsem);
 
+       if (new_cap)
+               ceph_put_cap(mdsc, new_cap);
 }
 
 /*
index f9e7399877d68947d0e989ea207c1b1f7aa15cef..8ad50a30808e0152f6590e51231856baac4c6717 100644 (file)
@@ -341,7 +341,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_LIST_HEAD(&ci->i_cap_snaps);
        ci->i_head_snapc = NULL;
        ci->i_snap_caps = 0;
-       ci->i_cap_exporting_issued = 0;
 
        for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
                ci->i_nr_by_mode[i] = 0;
@@ -407,7 +406,7 @@ void ceph_destroy_inode(struct inode *inode)
 
        /*
         * we may still have a snap_realm reference if there are stray
-        * caps in i_cap_exporting_issued or i_snap_caps.
+        * caps in i_snap_caps.
         */
        if (ci->i_snap_realm) {
                struct ceph_mds_client *mdsc =
@@ -582,6 +581,7 @@ static int fill_inode(struct inode *inode,
                      unsigned long ttl_from, int cap_fmode,
                      struct ceph_cap_reservation *caps_reservation)
 {
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct ceph_mds_reply_inode *info = iinfo->in;
        struct ceph_inode_info *ci = ceph_inode(inode);
        int i;
@@ -591,7 +591,9 @@ static int fill_inode(struct inode *inode,
        struct ceph_inode_frag *frag;
        struct rb_node *rb_node;
        struct ceph_buffer *xattr_blob = NULL;
+       struct ceph_cap *new_cap = NULL;
        int err = 0;
+       bool wake = false;
        bool queue_trunc = false;
        bool new_version = false;
 
@@ -599,6 +601,10 @@ static int fill_inode(struct inode *inode,
             inode, ceph_vinop(inode), le64_to_cpu(info->version),
             ci->i_version);
 
+       /* prealloc new cap struct */
+       if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
+               new_cap = ceph_get_cap(mdsc, caps_reservation);
+
        /*
         * prealloc xattr data, if it looks like we'll need it.  only
         * if len > 4 (meaning there are actually xattrs; the first 4
@@ -762,8 +768,37 @@ static int fill_inode(struct inode *inode,
                dout(" marking %p complete (empty)\n", inode);
                __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
        }
+
+       /* were we issued a capability? */
+       if (info->cap.caps) {
+               if (ceph_snap(inode) == CEPH_NOSNAP) {
+                       ceph_add_cap(inode, session,
+                                    le64_to_cpu(info->cap.cap_id),
+                                    cap_fmode,
+                                    le32_to_cpu(info->cap.caps),
+                                    le32_to_cpu(info->cap.wanted),
+                                    le32_to_cpu(info->cap.seq),
+                                    le32_to_cpu(info->cap.mseq),
+                                    le64_to_cpu(info->cap.realm),
+                                    info->cap.flags, &new_cap);
+                       wake = true;
+               } else {
+                       dout(" %p got snap_caps %s\n", inode,
+                            ceph_cap_string(le32_to_cpu(info->cap.caps)));
+                       ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
+                       if (cap_fmode >= 0)
+                               __ceph_get_fmode(ci, cap_fmode);
+               }
+       } else if (cap_fmode >= 0) {
+               pr_warning("mds issued no caps on %llx.%llx\n",
+                          ceph_vinop(inode));
+               __ceph_get_fmode(ci, cap_fmode);
+       }
        spin_unlock(&ci->i_ceph_lock);
 
+       if (wake)
+               wake_up_all(&ci->i_cap_wq);
+
        /* queue truncate if we saw i_size decrease */
        if (queue_trunc)
                ceph_queue_vmtruncate(inode);
@@ -806,41 +841,14 @@ static int fill_inode(struct inode *inode,
        }
        mutex_unlock(&ci->i_fragtree_mutex);
 
-       /* were we issued a capability? */
-       if (info->cap.caps) {
-               if (ceph_snap(inode) == CEPH_NOSNAP) {
-                       ceph_add_cap(inode, session,
-                                    le64_to_cpu(info->cap.cap_id),
-                                    cap_fmode,
-                                    le32_to_cpu(info->cap.caps),
-                                    le32_to_cpu(info->cap.wanted),
-                                    le32_to_cpu(info->cap.seq),
-                                    le32_to_cpu(info->cap.mseq),
-                                    le64_to_cpu(info->cap.realm),
-                                    info->cap.flags,
-                                    caps_reservation);
-               } else {
-                       spin_lock(&ci->i_ceph_lock);
-                       dout(" %p got snap_caps %s\n", inode,
-                            ceph_cap_string(le32_to_cpu(info->cap.caps)));
-                       ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
-                       if (cap_fmode >= 0)
-                               __ceph_get_fmode(ci, cap_fmode);
-                       spin_unlock(&ci->i_ceph_lock);
-               }
-       } else if (cap_fmode >= 0) {
-               pr_warning("mds issued no caps on %llx.%llx\n",
-                          ceph_vinop(inode));
-               __ceph_get_fmode(ci, cap_fmode);
-       }
-
        /* update delegation info? */
        if (dirinfo)
                ceph_fill_dirfrag(inode, dirinfo);
 
        err = 0;
-
 out:
+       if (new_cap)
+               ceph_put_cap(mdsc, new_cap);
        if (xattr_blob)
                ceph_buffer_put(xattr_blob);
        return err;
index ead05cc1f447562271578131ab25769257080915..12b20744e386f1281f454f6fb636729beb01e873 100644 (file)
@@ -292,7 +292,6 @@ struct ceph_inode_info {
        struct ceph_snap_context *i_head_snapc;  /* set if wr_buffer_head > 0 or
                                                    dirty|flushing caps */
        unsigned i_snap_caps;           /* cap bits for snapped files */
-       unsigned i_cap_exporting_issued;
 
        int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
 
@@ -775,11 +774,13 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
 extern const char *ceph_cap_string(int c);
 extern void ceph_handle_caps(struct ceph_mds_session *session,
                             struct ceph_msg *msg);
-extern int ceph_add_cap(struct inode *inode,
-                       struct ceph_mds_session *session, u64 cap_id,
-                       int fmode, unsigned issued, unsigned wanted,
-                       unsigned cap, unsigned seq, u64 realmino, int flags,
-                       struct ceph_cap_reservation *caps_reservation);
+extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
+                                    struct ceph_cap_reservation *ctx);
+extern void ceph_add_cap(struct inode *inode,
+                        struct ceph_mds_session *session, u64 cap_id,
+                        int fmode, unsigned issued, unsigned wanted,
+                        unsigned cap, unsigned seq, u64 realmino, int flags,
+                        struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
                         struct ceph_cap *cap);