+/* During recovery, other nodes can send us new MSTCPY locks (from
+ dlm_recover_locks) before we've made ourself master (in
+ dlm_recover_masters). */
+
+static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
+ uint32_t hash, uint32_t b,
+ int dir_nodeid, int from_nodeid,
+ unsigned int flags, struct dlm_rsb **r_ret)
+{
+ struct dlm_rsb *r = NULL;
+ int our_nodeid = dlm_our_nodeid();
+ int recover = (flags & R_RECEIVE_RECOVER);
+ int error;
+
+ retry:
+ error = pre_rsb_struct(ls);
+ if (error < 0)
+ goto out;
+
+ spin_lock(&ls->ls_rsbtbl[b].lock);
+
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+ if (error)
+ goto do_toss;
+
+ /*
+ * rsb is active, so we can't check master_nodeid without lock_rsb.
+ */
+
+ kref_get(&r->res_ref);
+ goto out_unlock;
+
+
+ do_toss:
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+ if (error)
+ goto do_new;
+
+ /*
+ * rsb found inactive. No other thread is using this rsb because
+ * it's on the toss list, so we can look at or update
+ * res_master_nodeid without lock_rsb.
+ */
+
+ if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
+ /* our rsb is not master, and another node has sent us a
+ request; this should never happen */
+ log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
+ from_nodeid, r->res_master_nodeid, dir_nodeid);
+ dlm_print_rsb(r);
+ error = -ENOTBLK;
+ goto out_unlock;
+ }
+
+ if (!recover && (r->res_master_nodeid != our_nodeid) &&
+ (dir_nodeid == our_nodeid)) {
+ /* our rsb is not master, and we are dir; may as well fix it;
+ this should never happen */
+ log_error(ls, "find_rsb toss our %d master %d dir %d",
+ our_nodeid, r->res_master_nodeid, dir_nodeid);
+ dlm_print_rsb(r);
+ r->res_master_nodeid = our_nodeid;
+ r->res_nodeid = 0;
+ }
+
+ rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+ error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+ goto out_unlock;
+
+
+ do_new:
+ /*
+ * rsb not found
+ */
+
+ error = get_rsb_struct(ls, name, len, &r);
+ if (error == -EAGAIN) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ goto retry;
+ }
+ if (error)
+ goto out_unlock;
+
+ r->res_hash = hash;
+ r->res_bucket = b;
+ r->res_dir_nodeid = dir_nodeid;
+ r->res_master_nodeid = dir_nodeid;
+ r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
+ kref_init(&r->res_ref);
+
+ error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+ out_unlock:
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ out:
+ *r_ret = r;
+ return error;
+}
+
+static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
+ unsigned int flags, struct dlm_rsb **r_ret)
+{
+ uint32_t hash, b;
+ int dir_nodeid;
+
+ if (len > DLM_RESNAME_MAXLEN)
+ return -EINVAL;
+
+ hash = jhash(name, len, 0);
+ b = hash & (ls->ls_rsbtbl_size - 1);
+
+ dir_nodeid = dlm_hash2nodeid(ls, hash);
+
+ if (dlm_no_directory(ls))
+ return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
+ from_nodeid, flags, r_ret);
+ else
+ return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
+ from_nodeid, flags, r_ret);
+}
+
+/* we have received a request and found that res_master_nodeid != our_nodeid,
+ so we need to return an error or make ourself the master */
+
+static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
+ int from_nodeid)
+{
+ if (dlm_no_directory(ls)) {
+ log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
+ from_nodeid, r->res_master_nodeid,
+ r->res_dir_nodeid);
+ dlm_print_rsb(r);
+ return -ENOTBLK;
+ }
+
+ if (from_nodeid != r->res_dir_nodeid) {
+ /* our rsb is not master, and another node (not the dir node)
+ has sent us a request. this is much more common when our
+ master_nodeid is zero, so limit debug to non-zero. */
+
+ if (r->res_master_nodeid) {
+ log_debug(ls, "validate master from_other %d master %d "
+ "dir %d first %x %s", from_nodeid,
+ r->res_master_nodeid, r->res_dir_nodeid,
+ r->res_first_lkid, r->res_name);
+ }
+ return -ENOTBLK;
+ } else {
+ /* our rsb is not master, but the dir nodeid has sent us a
+ request; this could happen with master 0 / res_nodeid -1 */
+
+ if (r->res_master_nodeid) {
+ log_error(ls, "validate master from_dir %d master %d "
+ "first %x %s",
+ from_nodeid, r->res_master_nodeid,
+ r->res_first_lkid, r->res_name);
+ }
+
+ r->res_master_nodeid = dlm_our_nodeid();
+ r->res_nodeid = 0;
+ return 0;
+ }
+}
+