Merge branch 'topic/adsp' of git://git.kernel.org/pub/scm/linux/kernel/git/broonie...
[firefly-linux-kernel-4.4.55.git] / fs / dlm / lock.c
index d9ee1b96549ab8dcab03773b2bd7884353b51823..b56950758188d4b72f4ed2e193047ab1aded8c8b 100644 (file)
@@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
        return error;
 }
 
-/* FIXME: make this more efficient */
+/* If there's an rsb for the same resource being removed, ensure
+   that the remove message is sent before the new lookup message.
+   It should be rare to need a delay here, but if not, then it may
+   be worthwhile to add a proper wait mechanism rather than a delay. */
 
-static int shrink_bucket(struct dlm_ls *ls, int b)
+static void wait_pending_remove(struct dlm_rsb *r)
 {
-       struct rb_node *n;
+       struct dlm_ls *ls = r->res_ls;
+ restart:
+       spin_lock(&ls->ls_remove_spin);
+       if (ls->ls_remove_len &&
+           !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
+               log_debug(ls, "delay lookup for remove dir %d %s",
+                         r->res_dir_nodeid, r->res_name);
+               spin_unlock(&ls->ls_remove_spin);
+               msleep(1);
+               goto restart;
+       }
+       spin_unlock(&ls->ls_remove_spin);
+}
+
+/*
+ * ls_remove_spin protects ls_remove_name and ls_remove_len which are
+ * read by other threads in wait_pending_remove.  ls_remove_names
+ * and ls_remove_lens are only used by the scan thread, so they do
+ * not need protection.
+ */
+
+static void shrink_bucket(struct dlm_ls *ls, int b)
+{
+       struct rb_node *n, *next;
        struct dlm_rsb *r;
+       char *name;
        int our_nodeid = dlm_our_nodeid();
-       int count = 0, found;
+       int remote_count = 0;
+       int i, len, rv;
 
-       for (;;) {
-               found = 0;
-               spin_lock(&ls->ls_rsbtbl[b].lock);
-               for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
-                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
+       memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
 
-                       /* If we're the directory record for this rsb, and
-                          we're not the master of it, then we need to wait
-                          for the master node to send us a dir remove for
-                          before removing the dir record. */
+       spin_lock(&ls->ls_rsbtbl[b].lock);
+       for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+               next = rb_next(n);
+               r = rb_entry(n, struct dlm_rsb, res_hashnode);
 
-                       if (!dlm_no_directory(ls) && !is_master(r) &&
-                           (dlm_dir_nodeid(r) == our_nodeid)) {
-                               continue;
-                       }
+               /* If we're the directory record for this rsb, and
+                  we're not the master of it, then we need to wait
+                  for the master node to send us a dir remove for
+                  before removing the dir record. */
 
-                       if (!time_after_eq(jiffies, r->res_toss_time +
-                                          dlm_config.ci_toss_secs * HZ))
-                               continue;
-                       found = 1;
-                       break;
+               if (!dlm_no_directory(ls) &&
+                   (r->res_master_nodeid != our_nodeid) &&
+                   (dlm_dir_nodeid(r) == our_nodeid)) {
+                       continue;
                }
 
-               if (!found) {
-                       spin_unlock(&ls->ls_rsbtbl[b].lock);
-                       break;
+               if (!time_after_eq(jiffies, r->res_toss_time +
+                                  dlm_config.ci_toss_secs * HZ)) {
+                       continue;
                }
 
-               if (kref_put(&r->res_ref, kill_rsb)) {
-                       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-                       spin_unlock(&ls->ls_rsbtbl[b].lock);
+               if (!dlm_no_directory(ls) &&
+                   (r->res_master_nodeid == our_nodeid) &&
+                   (dlm_dir_nodeid(r) != our_nodeid)) {
 
                        /* We're the master of this rsb but we're not
                           the directory record, so we need to tell the
                           dir node to remove the dir record. */
 
-                       if (!dlm_no_directory(ls) && is_master(r) &&
-                           (dlm_dir_nodeid(r) != our_nodeid)) {
-                               send_remove(r);
-                       }
+                       ls->ls_remove_lens[remote_count] = r->res_length;
+                       memcpy(ls->ls_remove_names[remote_count], r->res_name,
+                              DLM_RESNAME_MAXLEN);
+                       remote_count++;
 
-                       dlm_free_rsb(r);
-                       count++;
-               } else {
-                       spin_unlock(&ls->ls_rsbtbl[b].lock);
+                       if (remote_count >= DLM_REMOVE_NAMES_MAX)
+                               break;
+                       continue;
+               }
+
+               if (!kref_put(&r->res_ref, kill_rsb)) {
                        log_error(ls, "tossed rsb in use %s", r->res_name);
+                       continue;
                }
+
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+               dlm_free_rsb(r);
        }
+       spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+       /*
+        * While searching for rsb's to free, we found some that require
+        * remote removal.  We leave them in place and find them again here
+        * so there is a very small gap between removing them from the toss
+        * list and sending the removal.  Keeping this gap small is
+        * important to keep us (the master node) from being out of sync
+        * with the remote dir node for very long.
+        *
+        * From the time the rsb is removed from toss until just after
+        * send_remove, the rsb name is saved in ls_remove_name.  A new
+        * lookup checks this to ensure that a new lookup message for the
+        * same resource name is not sent just before the remove message.
+        */
+
+       for (i = 0; i < remote_count; i++) {
+               name = ls->ls_remove_names[i];
+               len = ls->ls_remove_lens[i];
+
+               spin_lock(&ls->ls_rsbtbl[b].lock);
+               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+               if (rv) {
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
+                       log_debug(ls, "remove_name not toss %s", name);
+                       continue;
+               }
 
-       return count;
+               if (r->res_master_nodeid != our_nodeid) {
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
+                       log_debug(ls, "remove_name master %d dir %d our %d %s",
+                                 r->res_master_nodeid, r->res_dir_nodeid,
+                                 our_nodeid, name);
+                       continue;
+               }
+
+               if (r->res_dir_nodeid == our_nodeid) {
+                       /* should never happen */
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
+                       log_error(ls, "remove_name dir %d master %d our %d %s",
+                                 r->res_dir_nodeid, r->res_master_nodeid,
+                                 our_nodeid, name);
+                       continue;
+               }
+
+               if (!time_after_eq(jiffies, r->res_toss_time +
+                                  dlm_config.ci_toss_secs * HZ)) {
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
+                       log_debug(ls, "remove_name toss_time %lu now %lu %s",
+                                 r->res_toss_time, jiffies, name);
+                       continue;
+               }
+
+               if (!kref_put(&r->res_ref, kill_rsb)) {
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
+                       log_error(ls, "remove_name in use %s", name);
+                       continue;
+               }
+
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+
+               /* block lookup of same name until we've sent remove */
+               spin_lock(&ls->ls_remove_spin);
+               ls->ls_remove_len = len;
+               memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
+               spin_unlock(&ls->ls_remove_spin);
+               spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+               send_remove(r);
+
+               /* allow lookup of name again */
+               spin_lock(&ls->ls_remove_spin);
+               ls->ls_remove_len = 0;
+               memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
+               spin_unlock(&ls->ls_remove_spin);
+
+               dlm_free_rsb(r);
+       }
 }
 
 void dlm_scan_rsbs(struct dlm_ls *ls)
@@ -2174,10 +2279,14 @@ static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
  * immediate request, it is 0 if called later, after the lock has been
  * queued.
  *
+ * recover is 1 if dlm_recover_grant() is trying to grant conversions
+ * after recovery.
+ *
  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
  */
 
-static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
+static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
+                          int recover)
 {
        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
 
@@ -2209,7 +2318,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
         */
 
        if (queue_conflict(&r->res_grantqueue, lkb))
-               goto out;
+               return 0;
 
        /*
         * 6-3: By default, a conversion request is immediately granted if the
@@ -2218,7 +2327,24 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
         */
 
        if (queue_conflict(&r->res_convertqueue, lkb))
-               goto out;
+               return 0;
+
+       /*
+        * The RECOVER_GRANT flag means dlm_recover_grant() is granting
+        * locks for a recovered rsb, on which lkb's have been rebuilt.
+        * The lkb's may have been rebuilt on the queues in a different
+        * order than they were in on the previous master.  So, granting
+        * queued conversions in order after recovery doesn't make sense
+        * since the order hasn't been preserved anyway.  The new order
+        * could also have created a new "in place" conversion deadlock.
+        * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
+        * After recovery, there would be no granted locks, and possibly
+        * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
+        * recovery, grant conversions without considering order.
+        */
+
+       if (conv && recover)
+               return 1;
 
        /*
         * 6-5: But the default algorithm for deciding whether to grant or
@@ -2255,7 +2381,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
                if (list_empty(&r->res_convertqueue))
                        return 1;
                else
-                       goto out;
+                       return 0;
        }
 
        /*
@@ -2301,12 +2427,12 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
        if (!now && !conv && list_empty(&r->res_convertqueue) &&
            first_in_list(lkb, &r->res_waitqueue))
                return 1;
- out:
+
        return 0;
 }
 
 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
-                         int *err)
+                         int recover, int *err)
 {
        int rv;
        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
@@ -2315,7 +2441,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
        if (err)
                *err = 0;
 
-       rv = _can_be_granted(r, lkb, now);
+       rv = _can_be_granted(r, lkb, now, recover);
        if (rv)
                goto out;
 
@@ -2356,7 +2482,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
 
        if (alt) {
                lkb->lkb_rqmode = alt;
-               rv = _can_be_granted(r, lkb, now);
+               rv = _can_be_granted(r, lkb, now, 0);
                if (rv)
                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
                else
@@ -2380,6 +2506,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
                                 unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
+       int recover = rsb_flag(r, RSB_RECOVER_GRANT);
        int hi, demoted, quit, grant_restart, demote_restart;
        int deadlk;
 
@@ -2393,7 +2520,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
                demoted = is_demoted(lkb);
                deadlk = 0;
 
-               if (can_be_granted(r, lkb, 0, &deadlk)) {
+               if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
                        grant_lock_pending(r, lkb);
                        grant_restart = 1;
                        if (count)
@@ -2437,7 +2564,7 @@ static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
        struct dlm_lkb *lkb, *s;
 
        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
-               if (can_be_granted(r, lkb, 0, NULL)) {
+               if (can_be_granted(r, lkb, 0, 0, NULL)) {
                        grant_lock_pending(r, lkb);
                        if (count)
                                (*count)++;
@@ -2608,6 +2735,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
                return 0;
        }
 
+       wait_pending_remove(r);
+
        r->res_first_lkid = lkb->lkb_id;
        send_lookup(r, lkb);
        return 1;
@@ -2935,7 +3064,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        int error = 0;
 
-       if (can_be_granted(r, lkb, 1, NULL)) {
+       if (can_be_granted(r, lkb, 1, 0, NULL)) {
                grant_lock(r, lkb);
                queue_cast(r, lkb, 0);
                goto out;
@@ -2975,7 +3104,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
        /* changing an existing lock may allow others to be granted */
 
-       if (can_be_granted(r, lkb, 1, &deadlk)) {
+       if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
                grant_lock(r, lkb);
                queue_cast(r, lkb, 0);
                goto out;
@@ -3001,7 +3130,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
        if (is_demoted(lkb)) {
                grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
-               if (_can_be_granted(r, lkb, 1)) {
+               if (_can_be_granted(r, lkb, 1, 0)) {
                        grant_lock(r, lkb);
                        queue_cast(r, lkb, 0);
                        goto out;
@@ -3871,12 +4000,70 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
        return error;
 }
 
+static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
+{
+       char name[DLM_RESNAME_MAXLEN + 1];
+       struct dlm_message *ms;
+       struct dlm_mhandle *mh;
+       struct dlm_rsb *r;
+       uint32_t hash, b;
+       int rv, dir_nodeid;
+
+       memset(name, 0, sizeof(name));
+       memcpy(name, ms_name, len);
+
+       hash = jhash(name, len, 0);
+       b = hash & (ls->ls_rsbtbl_size - 1);
+
+       dir_nodeid = dlm_hash2nodeid(ls, hash);
+
+       log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
+
+       spin_lock(&ls->ls_rsbtbl[b].lock);
+       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       if (!rv) {
+               spin_unlock(&ls->ls_rsbtbl[b].lock);
+               log_error(ls, "repeat_remove on keep %s", name);
+               return;
+       }
+
+       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+       if (!rv) {
+               spin_unlock(&ls->ls_rsbtbl[b].lock);
+               log_error(ls, "repeat_remove on toss %s", name);
+               return;
+       }
+
+       /* use ls->remove_name2 to avoid conflict with shrink? */
+
+       spin_lock(&ls->ls_remove_spin);
+       ls->ls_remove_len = len;
+       memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
+       spin_unlock(&ls->ls_remove_spin);
+       spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+       rv = _create_message(ls, sizeof(struct dlm_message) + len,
+                            dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
+       if (rv)
+               return;
+
+       memcpy(ms->m_extra, name, len);
+       ms->m_hash = hash;
+
+       send_message(mh, ms);
+
+       spin_lock(&ls->ls_remove_spin);
+       ls->ls_remove_len = 0;
+       memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
+       spin_unlock(&ls->ls_remove_spin);
+}
+
 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
        int from_nodeid;
-       int error, namelen;
+       int error, namelen = 0;
 
        from_nodeid = ms->m_header.h_nodeid;
 
@@ -3944,13 +4131,21 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
           delayed in being sent/arriving/being processed on the dir node.
           Another node would repeatedly lookup up the master, and the dir
           node would continue returning our nodeid until our send_remove
-          took effect. */
+          took effect.
+
+          We send another remove message in case our previous send_remove
+          was lost/ignored/missed somehow. */
 
        if (error != -ENOTBLK) {
                log_limit(ls, "receive_request %x from %d %d",
                          ms->m_lkid, from_nodeid, error);
        }
 
+       if (namelen && error == -EBADR) {
+               send_repeat_remove(ls, ms->m_extra, namelen);
+               msleep(1000);
+       }
+
        setup_stub_lkb(ls, ms);
        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
        return error;
@@ -5266,9 +5461,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 
                if (!rsb_flag(r, RSB_RECOVER_GRANT))
                        continue;
-               rsb_clear_flag(r, RSB_RECOVER_GRANT);
-               if (!is_master(r))
+               if (!is_master(r)) {
+                       rsb_clear_flag(r, RSB_RECOVER_GRANT);
                        continue;
+               }
                hold_rsb(r);
                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
                return r;
@@ -5313,7 +5509,9 @@ void dlm_recover_grant(struct dlm_ls *ls)
                rsb_count++;
                count = 0;
                lock_rsb(r);
+               /* the RECOVER_GRANT flag is checked in the grant path */
                grant_pending_locks(r, &count);
+               rsb_clear_flag(r, RSB_RECOVER_GRANT);
                lkb_count += count;
                confirm_master(r, 0);
                unlock_rsb(r);