return error;
}
-/* FIXME: make this more efficient */
+/* If there's an rsb for the same resource being removed, ensure
+ that the remove message is sent before the new lookup message.
+ It should be rare to need a delay here, but if not, then it may
+ be worthwhile to add a proper wait mechanism rather than a delay. */
-static int shrink_bucket(struct dlm_ls *ls, int b)
+static void wait_pending_remove(struct dlm_rsb *r)
{
- struct rb_node *n;
+ struct dlm_ls *ls = r->res_ls;
+ restart:
+ spin_lock(&ls->ls_remove_spin);
+ if (ls->ls_remove_len &&
+ !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
+ log_debug(ls, "delay lookup for remove dir %d %s",
+ r->res_dir_nodeid, r->res_name);
+ spin_unlock(&ls->ls_remove_spin);
+ msleep(1);
+ goto restart;
+ }
+ spin_unlock(&ls->ls_remove_spin);
+}
+
+/*
+ * ls_remove_spin protects ls_remove_name and ls_remove_len which are
+ * read by other threads in wait_pending_remove. ls_remove_names
+ * and ls_remove_lens are only used by the scan thread, so they do
+ * not need protection.
+ */
+
+static void shrink_bucket(struct dlm_ls *ls, int b)
+{
+ struct rb_node *n, *next;
struct dlm_rsb *r;
+ char *name;
int our_nodeid = dlm_our_nodeid();
- int count = 0, found;
+ int remote_count = 0;
+ int i, len, rv;
- for (;;) {
- found = 0;
- spin_lock(&ls->ls_rsbtbl[b].lock);
- for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
- r = rb_entry(n, struct dlm_rsb, res_hashnode);
+ memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
- /* If we're the directory record for this rsb, and
- we're not the master of it, then we need to wait
- for the master node to send us a dir remove for
- before removing the dir record. */
+ spin_lock(&ls->ls_rsbtbl[b].lock);
+ for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+ next = rb_next(n);
+ r = rb_entry(n, struct dlm_rsb, res_hashnode);
- if (!dlm_no_directory(ls) && !is_master(r) &&
- (dlm_dir_nodeid(r) == our_nodeid)) {
- continue;
- }
+ /* If we're the directory record for this rsb, and
+ we're not the master of it, then we need to wait
+ for the master node to send us a dir remove for
+ before removing the dir record. */
- if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.ci_toss_secs * HZ))
- continue;
- found = 1;
- break;
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) == our_nodeid)) {
+ continue;
}
- if (!found) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- break;
+ if (!time_after_eq(jiffies, r->res_toss_time +
+ dlm_config.ci_toss_secs * HZ)) {
+ continue;
}
- if (kref_put(&r->res_ref, kill_rsb)) {
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid == our_nodeid) &&
+ (dlm_dir_nodeid(r) != our_nodeid)) {
/* We're the master of this rsb but we're not
the directory record, so we need to tell the
dir node to remove the dir record. */
- if (!dlm_no_directory(ls) && is_master(r) &&
- (dlm_dir_nodeid(r) != our_nodeid)) {
- send_remove(r);
- }
+ ls->ls_remove_lens[remote_count] = r->res_length;
+ memcpy(ls->ls_remove_names[remote_count], r->res_name,
+ DLM_RESNAME_MAXLEN);
+ remote_count++;
- dlm_free_rsb(r);
- count++;
- } else {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ if (remote_count >= DLM_REMOVE_NAMES_MAX)
+ break;
+ continue;
+ }
+
+ if (!kref_put(&r->res_ref, kill_rsb)) {
log_error(ls, "tossed rsb in use %s", r->res_name);
+ continue;
}
+
+ rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+ dlm_free_rsb(r);
}
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+ /*
+ * While searching for rsb's to free, we found some that require
+ * remote removal. We leave them in place and find them again here
+ * so there is a very small gap between removing them from the toss
+ * list and sending the removal. Keeping this gap small is
+ * important to keep us (the master node) from being out of sync
+ * with the remote dir node for very long.
+ *
+ * From the time the rsb is removed from toss until just after
+ * send_remove, the rsb name is saved in ls_remove_name. A new
+ * lookup checks this to ensure that a new lookup message for the
+ * same resource name is not sent just before the remove message.
+ */
+
+ for (i = 0; i < remote_count; i++) {
+ name = ls->ls_remove_names[i];
+ len = ls->ls_remove_lens[i];
+
+ spin_lock(&ls->ls_rsbtbl[b].lock);
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+ if (rv) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_debug(ls, "remove_name not toss %s", name);
+ continue;
+ }
- return count;
+ if (r->res_master_nodeid != our_nodeid) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_debug(ls, "remove_name master %d dir %d our %d %s",
+ r->res_master_nodeid, r->res_dir_nodeid,
+ our_nodeid, name);
+ continue;
+ }
+
+ if (r->res_dir_nodeid == our_nodeid) {
+ /* should never happen */
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_error(ls, "remove_name dir %d master %d our %d %s",
+ r->res_dir_nodeid, r->res_master_nodeid,
+ our_nodeid, name);
+ continue;
+ }
+
+ if (!time_after_eq(jiffies, r->res_toss_time +
+ dlm_config.ci_toss_secs * HZ)) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_debug(ls, "remove_name toss_time %lu now %lu %s",
+ r->res_toss_time, jiffies, name);
+ continue;
+ }
+
+ if (!kref_put(&r->res_ref, kill_rsb)) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_error(ls, "remove_name in use %s", name);
+ continue;
+ }
+
+ rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+
+ /* block lookup of same name until we've sent remove */
+ spin_lock(&ls->ls_remove_spin);
+ ls->ls_remove_len = len;
+ memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
+ spin_unlock(&ls->ls_remove_spin);
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+ send_remove(r);
+
+ /* allow lookup of name again */
+ spin_lock(&ls->ls_remove_spin);
+ ls->ls_remove_len = 0;
+ memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
+ spin_unlock(&ls->ls_remove_spin);
+
+ dlm_free_rsb(r);
+ }
}
void dlm_scan_rsbs(struct dlm_ls *ls)
* immediate request, it is 0 if called later, after the lock has been
* queued.
*
+ * recover is 1 if dlm_recover_grant() is trying to grant conversions
+ * after recovery.
+ *
* References are from chapter 6 of "VAXcluster Principles" by Roy Davis
*/
-static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
+static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
+ int recover)
{
int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
*/
if (queue_conflict(&r->res_grantqueue, lkb))
- goto out;
+ return 0;
/*
* 6-3: By default, a conversion request is immediately granted if the
*/
if (queue_conflict(&r->res_convertqueue, lkb))
- goto out;
+ return 0;
+
+ /*
+ * The RECOVER_GRANT flag means dlm_recover_grant() is granting
+ * locks for a recovered rsb, on which lkb's have been rebuilt.
+ * The lkb's may have been rebuilt on the queues in a different
+ * order than they were in on the previous master. So, granting
+ * queued conversions in order after recovery doesn't make sense
+ * since the order hasn't been preserved anyway. The new order
+ * could also have created a new "in place" conversion deadlock.
+ * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
+ * After recovery, there would be no granted locks, and possibly
+ * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
+ * recovery, grant conversions without considering order.
+ */
+
+ if (conv && recover)
+ return 1;
/*
* 6-5: But the default algorithm for deciding whether to grant or
if (list_empty(&r->res_convertqueue))
return 1;
else
- goto out;
+ return 0;
}
/*
if (!now && !conv && list_empty(&r->res_convertqueue) &&
first_in_list(lkb, &r->res_waitqueue))
return 1;
- out:
+
return 0;
}
static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
- int *err)
+ int recover, int *err)
{
int rv;
int8_t alt = 0, rqmode = lkb->lkb_rqmode;
if (err)
*err = 0;
- rv = _can_be_granted(r, lkb, now);
+ rv = _can_be_granted(r, lkb, now, recover);
if (rv)
goto out;
if (alt) {
lkb->lkb_rqmode = alt;
- rv = _can_be_granted(r, lkb, now);
+ rv = _can_be_granted(r, lkb, now, 0);
if (rv)
lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
else
unsigned int *count)
{
struct dlm_lkb *lkb, *s;
+ int recover = rsb_flag(r, RSB_RECOVER_GRANT);
int hi, demoted, quit, grant_restart, demote_restart;
int deadlk;
demoted = is_demoted(lkb);
deadlk = 0;
- if (can_be_granted(r, lkb, 0, &deadlk)) {
+ if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
grant_lock_pending(r, lkb);
grant_restart = 1;
if (count)
struct dlm_lkb *lkb, *s;
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
- if (can_be_granted(r, lkb, 0, NULL)) {
+ if (can_be_granted(r, lkb, 0, 0, NULL)) {
grant_lock_pending(r, lkb);
if (count)
(*count)++;
return 0;
}
+ wait_pending_remove(r);
+
r->res_first_lkid = lkb->lkb_id;
send_lookup(r, lkb);
return 1;
{
int error = 0;
- if (can_be_granted(r, lkb, 1, NULL)) {
+ if (can_be_granted(r, lkb, 1, 0, NULL)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
goto out;
/* changing an existing lock may allow others to be granted */
- if (can_be_granted(r, lkb, 1, &deadlk)) {
+ if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
goto out;
if (is_demoted(lkb)) {
grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
- if (_can_be_granted(r, lkb, 1)) {
+ if (_can_be_granted(r, lkb, 1, 0)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
goto out;
return error;
}
+static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
+{
+ char name[DLM_RESNAME_MAXLEN + 1];
+ struct dlm_message *ms;
+ struct dlm_mhandle *mh;
+ struct dlm_rsb *r;
+ uint32_t hash, b;
+ int rv, dir_nodeid;
+
+ memset(name, 0, sizeof(name));
+ memcpy(name, ms_name, len);
+
+ hash = jhash(name, len, 0);
+ b = hash & (ls->ls_rsbtbl_size - 1);
+
+ dir_nodeid = dlm_hash2nodeid(ls, hash);
+
+ log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
+
+ spin_lock(&ls->ls_rsbtbl[b].lock);
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+ if (!rv) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_error(ls, "repeat_remove on keep %s", name);
+ return;
+ }
+
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+ if (!rv) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_error(ls, "repeat_remove on toss %s", name);
+ return;
+ }
+
+ /* use ls->remove_name2 to avoid conflict with shrink? */
+
+ spin_lock(&ls->ls_remove_spin);
+ ls->ls_remove_len = len;
+ memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
+ spin_unlock(&ls->ls_remove_spin);
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+ rv = _create_message(ls, sizeof(struct dlm_message) + len,
+ dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
+ if (rv)
+ return;
+
+ memcpy(ms->m_extra, name, len);
+ ms->m_hash = hash;
+
+ send_message(mh, ms);
+
+ spin_lock(&ls->ls_remove_spin);
+ ls->ls_remove_len = 0;
+ memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
+ spin_unlock(&ls->ls_remove_spin);
+}
+
static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int from_nodeid;
- int error, namelen;
+ int error, namelen = 0;
from_nodeid = ms->m_header.h_nodeid;
delayed in being sent/arriving/being processed on the dir node.
Another node would repeatedly lookup up the master, and the dir
node would continue returning our nodeid until our send_remove
- took effect. */
+ took effect.
+
+ We send another remove message in case our previous send_remove
+ was lost/ignored/missed somehow. */
if (error != -ENOTBLK) {
log_limit(ls, "receive_request %x from %d %d",
ms->m_lkid, from_nodeid, error);
}
+ if (namelen && error == -EBADR) {
+ send_repeat_remove(ls, ms->m_extra, namelen);
+ msleep(1000);
+ }
+
setup_stub_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
return error;
if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue;
- rsb_clear_flag(r, RSB_RECOVER_GRANT);
- if (!is_master(r))
+ if (!is_master(r)) {
+ rsb_clear_flag(r, RSB_RECOVER_GRANT);
continue;
+ }
hold_rsb(r);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
return r;
rsb_count++;
count = 0;
lock_rsb(r);
+ /* the RECOVER_GRANT flag is checked in the grant path */
grant_pending_locks(r, &count);
+ rsb_clear_flag(r, RSB_RECOVER_GRANT);
lkb_count += count;
confirm_master(r, 0);
unlock_rsb(r);