1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
65 #include "requestqueue.h"
69 #include "lockspace.h"
74 #include "lvb_table.h"
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
96 * Lock compatibilty matrix - thanks Steve
97 * UN = Unlocked state. Not really a state, used as a flag
98 * PD = Padding. Used to make the matrix a nice power of two in size
99 * Other states are the same as the VMS DLM.
100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
103 static const int __dlm_compat_matrix[8][8] = {
104 /* UN NL CR CW PR PW EX PD */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
116 * This defines the direction of transfer of LVB data.
117 * Granted mode is the row; requested mode is the column.
118 * Usage: matrix[grmode+1][rqmode+1]
119 * 1 = LVB is returned to the caller
120 * 0 = LVB is written to the resource
121 * -1 = nothing happens to the LVB
124 const int dlm_lvb_operations[8][8] = {
125 /* UN NL CR CW PR PW EX PD*/
126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
136 #define modes_compat(gr, rq) \
137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
139 int dlm_modes_compat(int mode1, int mode2)
141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
145 * Compatibility matrix for conversions with QUECVT set.
146 * Granted mode is the row; requested mode is the column.
147 * Usage: matrix[grmode+1][rqmode+1]
150 static const int __quecvt_compat_matrix[8][8] = {
151 /* UN NL CR CW PR PW EX PD */
152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
162 void dlm_print_lkb(struct dlm_lkb *lkb)
164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169 (unsigned long long)lkb->lkb_recover_seq);
172 static void dlm_print_rsb(struct dlm_rsb *r)
174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
181 void dlm_dump_rsb(struct dlm_rsb *r)
187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189 printk(KERN_ERR "rsb lookup list\n");
190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
192 printk(KERN_ERR "rsb grant queue:\n");
193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
195 printk(KERN_ERR "rsb convert queue:\n");
196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
198 printk(KERN_ERR "rsb wait queue:\n");
199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
203 /* Threads cannot use the lockspace while it's being recovered */
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
207 down_read(&ls->ls_in_recovery);
210 void dlm_unlock_recovery(struct dlm_ls *ls)
212 up_read(&ls->ls_in_recovery);
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
217 return down_read_trylock(&ls->ls_in_recovery);
220 static inline int can_be_queued(struct dlm_lkb *lkb)
222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
230 static inline int is_demoted(struct dlm_lkb *lkb)
232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
235 static inline int is_altmode(struct dlm_lkb *lkb)
237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
240 static inline int is_granted(struct dlm_lkb *lkb)
242 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
245 static inline int is_remote(struct dlm_rsb *r)
247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248 return !!r->res_nodeid;
251 static inline int is_process_copy(struct dlm_lkb *lkb)
253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
256 static inline int is_master_copy(struct dlm_lkb *lkb)
258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
261 static inline int middle_conversion(struct dlm_lkb *lkb)
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
269 static inline int down_conversion(struct dlm_lkb *lkb)
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
284 static inline int is_overlap(struct dlm_lkb *lkb)
286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287 DLM_IFL_OVERLAP_CANCEL));
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
292 if (is_master_copy(lkb))
297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
299 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300 timeout caused the cancel then return -ETIMEDOUT */
301 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
306 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
311 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
317 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
322 if (is_master_copy(lkb)) {
323 send_bast(r, lkb, rqmode);
325 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
330 * Basic operations on rsb's and lkb's
333 /* This is only called to add a reference when the code already holds
334 a valid reference to the rsb, so there's no need for locking. */
336 static inline void hold_rsb(struct dlm_rsb *r)
338 kref_get(&r->res_ref);
341 void dlm_hold_rsb(struct dlm_rsb *r)
346 /* When all references to the rsb are gone it's transferred to
347 the tossed list for later disposal. */
349 static void put_rsb(struct dlm_rsb *r)
351 struct dlm_ls *ls = r->res_ls;
352 uint32_t bucket = r->res_bucket;
354 spin_lock(&ls->ls_rsbtbl[bucket].lock);
355 kref_put(&r->res_ref, toss_rsb);
356 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
359 void dlm_put_rsb(struct dlm_rsb *r)
364 static int pre_rsb_struct(struct dlm_ls *ls)
366 struct dlm_rsb *r1, *r2;
369 spin_lock(&ls->ls_new_rsb_spin);
370 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371 spin_unlock(&ls->ls_new_rsb_spin);
374 spin_unlock(&ls->ls_new_rsb_spin);
376 r1 = dlm_allocate_rsb(ls);
377 r2 = dlm_allocate_rsb(ls);
379 spin_lock(&ls->ls_new_rsb_spin);
381 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382 ls->ls_new_rsb_count++;
385 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386 ls->ls_new_rsb_count++;
388 count = ls->ls_new_rsb_count;
389 spin_unlock(&ls->ls_new_rsb_spin);
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397 unlock any spinlocks, go back and call pre_rsb_struct again.
398 Otherwise, take an rsb off the list and return it. */
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401 struct dlm_rsb **r_ret)
406 spin_lock(&ls->ls_new_rsb_spin);
407 if (list_empty(&ls->ls_new_rsb)) {
408 count = ls->ls_new_rsb_count;
409 spin_unlock(&ls->ls_new_rsb_spin);
410 log_debug(ls, "find_rsb retry %d %d %s",
411 count, dlm_config.ci_new_rsb_count, name);
415 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416 list_del(&r->res_hashchain);
417 /* Convert the empty list_head to a NULL rb_node for tree usage: */
418 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419 ls->ls_new_rsb_count--;
420 spin_unlock(&ls->ls_new_rsb_spin);
424 memcpy(r->res_name, name, len);
425 mutex_init(&r->res_mutex);
427 INIT_LIST_HEAD(&r->res_lookup);
428 INIT_LIST_HEAD(&r->res_grantqueue);
429 INIT_LIST_HEAD(&r->res_convertqueue);
430 INIT_LIST_HEAD(&r->res_waitqueue);
431 INIT_LIST_HEAD(&r->res_root_list);
432 INIT_LIST_HEAD(&r->res_recover_list);
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
440 char maxname[DLM_RESNAME_MAXLEN];
442 memset(maxname, 0, DLM_RESNAME_MAXLEN);
443 memcpy(maxname, name, nlen);
444 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448 struct dlm_rsb **r_ret)
450 struct rb_node *node = tree->rb_node;
455 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456 rc = rsb_cmp(r, name, len);
458 node = node->rb_left;
460 node = node->rb_right;
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
474 struct rb_node **newn = &tree->rb_node;
475 struct rb_node *parent = NULL;
479 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
483 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
485 newn = &parent->rb_left;
487 newn = &parent->rb_right;
489 log_print("rsb_insert match");
496 rb_link_node(&rsb->res_hashnode, parent, newn);
497 rb_insert_color(&rsb->res_hashnode, tree);
502 * Find rsb in rsbtbl and potentially create/add one
504 * Delaying the release of rsb's has a similar benefit to applications keeping
505 * NL locks on an rsb, but without the guarantee that the cached master value
506 * will still be valid when the rsb is reused. Apps aren't always smart enough
507 * to keep NL locks on an rsb that they may lock again shortly; this can lead
508 * to excessive master lookups and removals if we don't delay the release.
510 * Searching for an rsb means looking through both the normal list and toss
511 * list. When found on the toss list the rsb is moved to the normal list with
512 * ref count of 1; when found on normal list the ref count is incremented.
514 * rsb's on the keep list are being used locally and refcounted.
515 * rsb's on the toss list are not being used locally, and are not refcounted.
517 * The toss list rsb's were either
518 * - previously used locally but not any more (were on keep list, then
519 * moved to toss list when last refcount dropped)
520 * - created and put on toss list as a directory record for a lookup
521 * (we are the dir node for the res, but are not using the res right now,
522 * but some other node is)
524 * The purpose of find_rsb() is to return a refcounted rsb for local use.
525 * So, if the given rsb is on the toss list, it is moved to the keep list
526 * before being returned.
528 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529 * more refcounts exist, so the rsb is moved from the keep list to the
532 * rsb's on both keep and toss lists are used for doing a name to master
533 * lookups. rsb's that are in use locally (and being refcounted) are on
534 * the keep list, rsb's that are not in use locally (not refcounted) and
535 * only exist for name/master lookups are on the toss list.
537 * rsb's on the toss list who's dir_nodeid is not local can have stale
538 * name/master mappings. So, remote requests on such rsb's can potentially
539 * return with an error, which means the mapping is stale and needs to
540 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
541 * first_lkid is to keep only a single outstanding request on an rsb
542 * while that rsb has a potentially stale master.)
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546 uint32_t hash, uint32_t b,
547 int dir_nodeid, int from_nodeid,
548 unsigned int flags, struct dlm_rsb **r_ret)
550 struct dlm_rsb *r = NULL;
551 int our_nodeid = dlm_our_nodeid();
558 if (flags & R_RECEIVE_REQUEST) {
559 if (from_nodeid == dir_nodeid)
563 } else if (flags & R_REQUEST) {
568 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569 * from_nodeid has sent us a lock in dlm_recover_locks, believing
570 * we're the new master. Our local recovery may not have set
571 * res_master_nodeid to our_nodeid yet, so allow either. Don't
572 * create the rsb; dlm_recover_process_copy() will handle EBADR
575 * If someone sends us a request, we are the dir node, and we do
576 * not find the rsb anywhere, then recreate it. This happens if
577 * someone sends us a request after we have removed/freed an rsb
578 * from our toss list. (They sent a request instead of lookup
579 * because they are using an rsb from their toss list.)
582 if (from_local || from_dir ||
583 (from_other && (dir_nodeid == our_nodeid))) {
589 error = pre_rsb_struct(ls);
594 spin_lock(&ls->ls_rsbtbl[b].lock);
596 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
601 * rsb is active, so we can't check master_nodeid without lock_rsb.
604 kref_get(&r->res_ref);
610 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
615 * rsb found inactive (master_nodeid may be out of date unless
616 * we are the dir_nodeid or were the master) No other thread
617 * is using this rsb because it's on the toss list, so we can
618 * look at or update res_master_nodeid without lock_rsb.
621 if ((r->res_master_nodeid != our_nodeid) && from_other) {
622 /* our rsb was not master, and another node (not the dir node)
623 has sent us a request */
624 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625 from_nodeid, r->res_master_nodeid, dir_nodeid,
631 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632 /* don't think this should ever happen */
633 log_error(ls, "find_rsb toss from_dir %d master %d",
634 from_nodeid, r->res_master_nodeid);
636 /* fix it and go on */
637 r->res_master_nodeid = our_nodeid;
639 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640 r->res_first_lkid = 0;
643 if (from_local && (r->res_master_nodeid != our_nodeid)) {
644 /* Because we have held no locks on this rsb,
645 res_master_nodeid could have become stale. */
646 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647 r->res_first_lkid = 0;
650 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
660 if (error == -EBADR && !create)
663 error = get_rsb_struct(ls, name, len, &r);
664 if (error == -EAGAIN) {
665 spin_unlock(&ls->ls_rsbtbl[b].lock);
673 r->res_dir_nodeid = dir_nodeid;
674 kref_init(&r->res_ref);
677 /* want to see how often this happens */
678 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679 from_nodeid, r->res_name);
680 r->res_master_nodeid = our_nodeid;
685 if (from_other && (dir_nodeid != our_nodeid)) {
686 /* should never happen */
687 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
695 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696 from_nodeid, dir_nodeid, r->res_name);
699 if (dir_nodeid == our_nodeid) {
700 /* When we are the dir nodeid, we can set the master
702 r->res_master_nodeid = our_nodeid;
705 /* set_master will send_lookup to dir_nodeid */
706 r->res_master_nodeid = 0;
711 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
713 spin_unlock(&ls->ls_rsbtbl[b].lock);
719 /* During recovery, other nodes can send us new MSTCPY locks (from
720 dlm_recover_locks) before we've made ourself master (in
721 dlm_recover_masters). */
723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724 uint32_t hash, uint32_t b,
725 int dir_nodeid, int from_nodeid,
726 unsigned int flags, struct dlm_rsb **r_ret)
728 struct dlm_rsb *r = NULL;
729 int our_nodeid = dlm_our_nodeid();
730 int recover = (flags & R_RECEIVE_RECOVER);
734 error = pre_rsb_struct(ls);
738 spin_lock(&ls->ls_rsbtbl[b].lock);
740 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
745 * rsb is active, so we can't check master_nodeid without lock_rsb.
748 kref_get(&r->res_ref);
753 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
758 * rsb found inactive. No other thread is using this rsb because
759 * it's on the toss list, so we can look at or update
760 * res_master_nodeid without lock_rsb.
763 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764 /* our rsb is not master, and another node has sent us a
765 request; this should never happen */
766 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767 from_nodeid, r->res_master_nodeid, dir_nodeid);
773 if (!recover && (r->res_master_nodeid != our_nodeid) &&
774 (dir_nodeid == our_nodeid)) {
775 /* our rsb is not master, and we are dir; may as well fix it;
776 this should never happen */
777 log_error(ls, "find_rsb toss our %d master %d dir %d",
778 our_nodeid, r->res_master_nodeid, dir_nodeid);
780 r->res_master_nodeid = our_nodeid;
784 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
794 error = get_rsb_struct(ls, name, len, &r);
795 if (error == -EAGAIN) {
796 spin_unlock(&ls->ls_rsbtbl[b].lock);
804 r->res_dir_nodeid = dir_nodeid;
805 r->res_master_nodeid = dir_nodeid;
806 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807 kref_init(&r->res_ref);
809 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
811 spin_unlock(&ls->ls_rsbtbl[b].lock);
817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818 unsigned int flags, struct dlm_rsb **r_ret)
823 if (len > DLM_RESNAME_MAXLEN)
826 hash = jhash(name, len, 0);
827 b = hash & (ls->ls_rsbtbl_size - 1);
829 dir_nodeid = dlm_hash2nodeid(ls, hash);
831 if (dlm_no_directory(ls))
832 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833 from_nodeid, flags, r_ret);
835 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836 from_nodeid, flags, r_ret);
839 /* we have received a request and found that res_master_nodeid != our_nodeid,
840 so we need to return an error or make ourself the master */
842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
845 if (dlm_no_directory(ls)) {
846 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847 from_nodeid, r->res_master_nodeid,
853 if (from_nodeid != r->res_dir_nodeid) {
854 /* our rsb is not master, and another node (not the dir node)
855 has sent us a request. this is much more common when our
856 master_nodeid is zero, so limit debug to non-zero. */
858 if (r->res_master_nodeid) {
859 log_debug(ls, "validate master from_other %d master %d "
860 "dir %d first %x %s", from_nodeid,
861 r->res_master_nodeid, r->res_dir_nodeid,
862 r->res_first_lkid, r->res_name);
866 /* our rsb is not master, but the dir nodeid has sent us a
867 request; this could happen with master 0 / res_nodeid -1 */
869 if (r->res_master_nodeid) {
870 log_error(ls, "validate master from_dir %d master %d "
872 from_nodeid, r->res_master_nodeid,
873 r->res_first_lkid, r->res_name);
876 r->res_master_nodeid = dlm_our_nodeid();
883 * We're the dir node for this res and another node wants to know the
884 * master nodeid. During normal operation (non recovery) this is only
885 * called from receive_lookup(); master lookups when the local node is
886 * the dir node are done by find_rsb().
888 * normal operation, we are the dir node for a resource
893 * . dlm_master_lookup flags 0
895 * recover directory, we are rebuilding dir for all resources
896 * . dlm_recover_directory
898 * remote node sends back the rsb names it is master of and we are dir of
899 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900 * we either create new rsb setting remote node as master, or find existing
901 * rsb and set master to be the remote node.
903 * recover masters, we are finding the new master for resources
904 * . dlm_recover_masters
906 * . dlm_send_rcom_lookup
907 * . receive_rcom_lookup
908 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
912 unsigned int flags, int *r_nodeid, int *result)
914 struct dlm_rsb *r = NULL;
916 int from_master = (flags & DLM_LU_RECOVER_DIR);
917 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918 int our_nodeid = dlm_our_nodeid();
919 int dir_nodeid, error, toss_list = 0;
921 if (len > DLM_RESNAME_MAXLEN)
924 if (from_nodeid == our_nodeid) {
925 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
930 hash = jhash(name, len, 0);
931 b = hash & (ls->ls_rsbtbl_size - 1);
933 dir_nodeid = dlm_hash2nodeid(ls, hash);
934 if (dir_nodeid != our_nodeid) {
935 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936 from_nodeid, dir_nodeid, our_nodeid, hash,
943 error = pre_rsb_struct(ls);
947 spin_lock(&ls->ls_rsbtbl[b].lock);
948 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
950 /* because the rsb is active, we need to lock_rsb before
951 checking/changing re_master_nodeid */
954 spin_unlock(&ls->ls_rsbtbl[b].lock);
959 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
963 /* because the rsb is inactive (on toss list), it's not refcounted
964 and lock_rsb is not used, but is protected by the rsbtbl lock */
968 if (r->res_dir_nodeid != our_nodeid) {
969 /* should not happen, but may as well fix it and carry on */
970 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971 r->res_dir_nodeid, our_nodeid, r->res_name);
972 r->res_dir_nodeid = our_nodeid;
975 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976 /* Recovery uses this function to set a new master when
977 the previous master failed. Setting NEW_MASTER will
978 force dlm_recover_masters to call recover_master on this
979 rsb even though the res_nodeid is no longer removed. */
981 r->res_master_nodeid = from_nodeid;
982 r->res_nodeid = from_nodeid;
983 rsb_set_flag(r, RSB_NEW_MASTER);
986 /* I don't think we should ever find it on toss list. */
987 log_error(ls, "dlm_master_lookup fix_master on toss");
992 if (from_master && (r->res_master_nodeid != from_nodeid)) {
993 /* this will happen if from_nodeid became master during
994 a previous recovery cycle, and we aborted the previous
995 cycle before recovering this master value */
997 log_limit(ls, "dlm_master_lookup from_master %d "
998 "master_nodeid %d res_nodeid %d first %x %s",
999 from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000 r->res_first_lkid, r->res_name);
1002 if (r->res_master_nodeid == our_nodeid) {
1003 log_error(ls, "from_master %d our_master", from_nodeid);
1005 dlm_send_rcom_lookup_dump(r, from_nodeid);
1009 r->res_master_nodeid = from_nodeid;
1010 r->res_nodeid = from_nodeid;
1011 rsb_set_flag(r, RSB_NEW_MASTER);
1014 if (!r->res_master_nodeid) {
1015 /* this will happen if recovery happens while we're looking
1016 up the master for this rsb */
1018 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019 from_nodeid, r->res_first_lkid, r->res_name);
1020 r->res_master_nodeid = from_nodeid;
1021 r->res_nodeid = from_nodeid;
1024 if (!from_master && !fix_master &&
1025 (r->res_master_nodeid == from_nodeid)) {
1026 /* this can happen when the master sends remove, the dir node
1027 finds the rsb on the keep list and ignores the remove,
1028 and the former master sends a lookup */
1030 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031 "first %x %s", from_nodeid, flags,
1032 r->res_first_lkid, r->res_name);
1036 *r_nodeid = r->res_master_nodeid;
1038 *result = DLM_LU_MATCH;
1041 r->res_toss_time = jiffies;
1042 /* the rsb was inactive (on toss list) */
1043 spin_unlock(&ls->ls_rsbtbl[b].lock);
1045 /* the rsb was active */
1052 error = get_rsb_struct(ls, name, len, &r);
1053 if (error == -EAGAIN) {
1054 spin_unlock(&ls->ls_rsbtbl[b].lock);
1062 r->res_dir_nodeid = our_nodeid;
1063 r->res_master_nodeid = from_nodeid;
1064 r->res_nodeid = from_nodeid;
1065 kref_init(&r->res_ref);
1066 r->res_toss_time = jiffies;
1068 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1070 /* should never happen */
1072 spin_unlock(&ls->ls_rsbtbl[b].lock);
1077 *result = DLM_LU_ADD;
1078 *r_nodeid = from_nodeid;
1081 spin_unlock(&ls->ls_rsbtbl[b].lock);
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1091 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092 spin_lock(&ls->ls_rsbtbl[i].lock);
1093 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095 if (r->res_hash == hash)
1098 spin_unlock(&ls->ls_rsbtbl[i].lock);
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1104 struct dlm_rsb *r = NULL;
1108 hash = jhash(name, len, 0);
1109 b = hash & (ls->ls_rsbtbl_size - 1);
1111 spin_lock(&ls->ls_rsbtbl[b].lock);
1112 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1116 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1122 spin_unlock(&ls->ls_rsbtbl[b].lock);
1125 static void toss_rsb(struct kref *kref)
1127 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128 struct dlm_ls *ls = r->res_ls;
1130 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131 kref_init(&r->res_ref);
1132 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134 r->res_toss_time = jiffies;
1135 if (r->res_lvbptr) {
1136 dlm_free_lvb(r->res_lvbptr);
1137 r->res_lvbptr = NULL;
1141 /* See comment for unhold_lkb */
1143 static void unhold_rsb(struct dlm_rsb *r)
1146 rv = kref_put(&r->res_ref, toss_rsb);
1147 DLM_ASSERT(!rv, dlm_dump_rsb(r););
1150 static void kill_rsb(struct kref *kref)
1152 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1154 /* All work is done after the return from kref_put() so we
1155 can release the write_lock before the remove and free. */
1157 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1165 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166 The rsb must exist as long as any lkb's for it do. */
1168 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1171 lkb->lkb_resource = r;
1174 static void detach_lkb(struct dlm_lkb *lkb)
1176 if (lkb->lkb_resource) {
1177 put_rsb(lkb->lkb_resource);
1178 lkb->lkb_resource = NULL;
1182 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1184 struct dlm_lkb *lkb;
1187 lkb = dlm_allocate_lkb(ls);
1191 lkb->lkb_nodeid = -1;
1192 lkb->lkb_grmode = DLM_LOCK_IV;
1193 kref_init(&lkb->lkb_ref);
1194 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196 INIT_LIST_HEAD(&lkb->lkb_time_list);
1197 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198 mutex_init(&lkb->lkb_cb_mutex);
1199 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1202 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1206 spin_lock(&ls->ls_lkbidr_spin);
1207 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1210 spin_unlock(&ls->ls_lkbidr_spin);
1216 log_error(ls, "create_lkb idr error %d", rv);
1224 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1226 struct dlm_lkb *lkb;
1228 spin_lock(&ls->ls_lkbidr_spin);
1229 lkb = idr_find(&ls->ls_lkbidr, lkid);
1231 kref_get(&lkb->lkb_ref);
1232 spin_unlock(&ls->ls_lkbidr_spin);
1235 return lkb ? 0 : -ENOENT;
1238 static void kill_lkb(struct kref *kref)
1240 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1242 /* All work is done after the return from kref_put() so we
1243 can release the write_lock before the detach_lkb */
1245 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1248 /* __put_lkb() is used when an lkb may not have an rsb attached to
1249 it so we need to provide the lockspace explicitly */
1251 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1253 uint32_t lkid = lkb->lkb_id;
1255 spin_lock(&ls->ls_lkbidr_spin);
1256 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257 idr_remove(&ls->ls_lkbidr, lkid);
1258 spin_unlock(&ls->ls_lkbidr_spin);
1262 /* for local/process lkbs, lvbptr points to caller's lksb */
1263 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264 dlm_free_lvb(lkb->lkb_lvbptr);
1268 spin_unlock(&ls->ls_lkbidr_spin);
1273 int dlm_put_lkb(struct dlm_lkb *lkb)
1277 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1280 ls = lkb->lkb_resource->res_ls;
1281 return __put_lkb(ls, lkb);
1284 /* This is only called to add a reference when the code already holds
1285 a valid reference to the lkb, so there's no need for locking. */
1287 static inline void hold_lkb(struct dlm_lkb *lkb)
1289 kref_get(&lkb->lkb_ref);
1292 /* This is called when we need to remove a reference and are certain
1293 it's not the last ref. e.g. del_lkb is always called between a
1294 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295 put_lkb would work fine, but would involve unnecessary locking */
1297 static inline void unhold_lkb(struct dlm_lkb *lkb)
1300 rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1304 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1307 struct dlm_lkb *lkb = NULL;
1309 list_for_each_entry(lkb, head, lkb_statequeue)
1310 if (lkb->lkb_rqmode < mode)
1313 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1316 /* add/remove lkb to rsb's grant/convert/wait queue */
1318 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1320 kref_get(&lkb->lkb_ref);
1322 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1324 lkb->lkb_timestamp = ktime_get();
1326 lkb->lkb_status = status;
1329 case DLM_LKSTS_WAITING:
1330 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1333 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1335 case DLM_LKSTS_GRANTED:
1336 /* convention says granted locks kept in order of grmode */
1337 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1340 case DLM_LKSTS_CONVERT:
1341 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1344 list_add_tail(&lkb->lkb_statequeue,
1345 &r->res_convertqueue);
1348 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1352 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1354 lkb->lkb_status = 0;
1355 list_del(&lkb->lkb_statequeue);
1359 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1363 add_lkb(r, lkb, sts);
1367 static int msg_reply_type(int mstype)
1370 case DLM_MSG_REQUEST:
1371 return DLM_MSG_REQUEST_REPLY;
1372 case DLM_MSG_CONVERT:
1373 return DLM_MSG_CONVERT_REPLY;
1374 case DLM_MSG_UNLOCK:
1375 return DLM_MSG_UNLOCK_REPLY;
1376 case DLM_MSG_CANCEL:
1377 return DLM_MSG_CANCEL_REPLY;
1378 case DLM_MSG_LOOKUP:
1379 return DLM_MSG_LOOKUP_REPLY;
1384 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1388 for (i = 0; i < num_nodes; i++) {
1393 if (warned[i] == nodeid)
1399 void dlm_scan_waiters(struct dlm_ls *ls)
1401 struct dlm_lkb *lkb;
1402 ktime_t zero = ktime_set(0, 0);
1404 s64 debug_maxus = 0;
1405 u32 debug_scanned = 0;
1406 u32 debug_expired = 0;
1410 if (!dlm_config.ci_waitwarn_us)
1413 mutex_lock(&ls->ls_waiters_mutex);
1415 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1416 if (ktime_equal(lkb->lkb_wait_time, zero))
1421 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1423 if (us < dlm_config.ci_waitwarn_us)
1426 lkb->lkb_wait_time = zero;
1429 if (us > debug_maxus)
1433 num_nodes = ls->ls_num_nodes;
1434 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1438 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1441 log_error(ls, "waitwarn %x %lld %d us check connection to "
1442 "node %d", lkb->lkb_id, (long long)us,
1443 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1445 mutex_unlock(&ls->ls_waiters_mutex);
1449 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450 debug_scanned, debug_expired,
1451 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1454 /* add/remove lkb from global waiters list of lkb's waiting for
1455 a reply from a remote node */
1457 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1459 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1462 mutex_lock(&ls->ls_waiters_mutex);
1464 if (is_overlap_unlock(lkb) ||
1465 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1470 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1472 case DLM_MSG_UNLOCK:
1473 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1475 case DLM_MSG_CANCEL:
1476 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1482 lkb->lkb_wait_count++;
1485 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487 lkb->lkb_wait_count, lkb->lkb_flags);
1491 DLM_ASSERT(!lkb->lkb_wait_count,
1493 printk("wait_count %d\n", lkb->lkb_wait_count););
1495 lkb->lkb_wait_count++;
1496 lkb->lkb_wait_type = mstype;
1497 lkb->lkb_wait_time = ktime_get();
1498 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1500 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1503 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1506 mutex_unlock(&ls->ls_waiters_mutex);
1510 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1511 list as part of process_requestqueue (e.g. a lookup that has an optimized
1512 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513 set RESEND and dlm_recover_waiters_post() */
1515 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516 struct dlm_message *ms)
1518 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519 int overlap_done = 0;
1521 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1523 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1528 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1530 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1535 /* Cancel state was preemptively cleared by a successful convert,
1536 see next comment, nothing to do. */
1538 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541 lkb->lkb_id, lkb->lkb_wait_type);
1545 /* Remove for the convert reply, and premptively remove for the
1546 cancel reply. A convert has been granted while there's still
1547 an outstanding cancel on it (the cancel is moot and the result
1548 in the cancel reply should be 0). We preempt the cancel reply
1549 because the app gets the convert result and then can follow up
1550 with another op, like convert. This subsequent op would see the
1551 lingering state of the cancel and fail with -EBUSY. */
1553 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1558 lkb->lkb_wait_type = 0;
1559 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1560 lkb->lkb_wait_count--;
1564 /* N.B. type of reply may not always correspond to type of original
1565 msg due to lookup->request optimization, verify others? */
1567 if (lkb->lkb_wait_type) {
1568 lkb->lkb_wait_type = 0;
1572 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574 mstype, lkb->lkb_flags);
1578 /* the force-unlock/cancel has completed and we haven't recvd a reply
1579 to the op that was in progress prior to the unlock/cancel; we
1580 give up on any reply to the earlier op. FIXME: not sure when/how
1581 this would happen */
1583 if (overlap_done && lkb->lkb_wait_type) {
1584 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586 lkb->lkb_wait_count--;
1587 lkb->lkb_wait_type = 0;
1590 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1592 lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593 lkb->lkb_wait_count--;
1594 if (!lkb->lkb_wait_count)
1595 list_del_init(&lkb->lkb_wait_reply);
1600 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1602 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1605 mutex_lock(&ls->ls_waiters_mutex);
1606 error = _remove_from_waiters(lkb, mstype, NULL);
1607 mutex_unlock(&ls->ls_waiters_mutex);
1611 /* Handles situations where we might be processing a "fake" or "stub" reply in
1612 which we can't try to take waiters_mutex again. */
1614 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1616 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1619 if (ms->m_flags != DLM_IFL_STUB_MS)
1620 mutex_lock(&ls->ls_waiters_mutex);
1621 error = _remove_from_waiters(lkb, ms->m_type, ms);
1622 if (ms->m_flags != DLM_IFL_STUB_MS)
1623 mutex_unlock(&ls->ls_waiters_mutex);
1627 /* If there's an rsb for the same resource being removed, ensure
1628 that the remove message is sent before the new lookup message.
1629 It should be rare to need a delay here, but if not, then it may
1630 be worthwhile to add a proper wait mechanism rather than a delay. */
1632 static void wait_pending_remove(struct dlm_rsb *r)
1634 struct dlm_ls *ls = r->res_ls;
1636 spin_lock(&ls->ls_remove_spin);
1637 if (ls->ls_remove_len &&
1638 !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639 log_debug(ls, "delay lookup for remove dir %d %s",
1640 r->res_dir_nodeid, r->res_name);
1641 spin_unlock(&ls->ls_remove_spin);
1645 spin_unlock(&ls->ls_remove_spin);
1649 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650 * read by other threads in wait_pending_remove. ls_remove_names
1651 * and ls_remove_lens are only used by the scan thread, so they do
1652 * not need protection.
1655 static void shrink_bucket(struct dlm_ls *ls, int b)
1657 struct rb_node *n, *next;
1660 int our_nodeid = dlm_our_nodeid();
1661 int remote_count = 0;
1664 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1666 spin_lock(&ls->ls_rsbtbl[b].lock);
1667 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1669 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1671 /* If we're the directory record for this rsb, and
1672 we're not the master of it, then we need to wait
1673 for the master node to send us a dir remove for
1674 before removing the dir record. */
1676 if (!dlm_no_directory(ls) &&
1677 (r->res_master_nodeid != our_nodeid) &&
1678 (dlm_dir_nodeid(r) == our_nodeid)) {
1682 if (!time_after_eq(jiffies, r->res_toss_time +
1683 dlm_config.ci_toss_secs * HZ)) {
1687 if (!dlm_no_directory(ls) &&
1688 (r->res_master_nodeid == our_nodeid) &&
1689 (dlm_dir_nodeid(r) != our_nodeid)) {
1691 /* We're the master of this rsb but we're not
1692 the directory record, so we need to tell the
1693 dir node to remove the dir record. */
1695 ls->ls_remove_lens[remote_count] = r->res_length;
1696 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1697 DLM_RESNAME_MAXLEN);
1700 if (remote_count >= DLM_REMOVE_NAMES_MAX)
1705 if (!kref_put(&r->res_ref, kill_rsb)) {
1706 log_error(ls, "tossed rsb in use %s", r->res_name);
1710 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1713 spin_unlock(&ls->ls_rsbtbl[b].lock);
1716 * While searching for rsb's to free, we found some that require
1717 * remote removal. We leave them in place and find them again here
1718 * so there is a very small gap between removing them from the toss
1719 * list and sending the removal. Keeping this gap small is
1720 * important to keep us (the master node) from being out of sync
1721 * with the remote dir node for very long.
1723 * From the time the rsb is removed from toss until just after
1724 * send_remove, the rsb name is saved in ls_remove_name. A new
1725 * lookup checks this to ensure that a new lookup message for the
1726 * same resource name is not sent just before the remove message.
1729 for (i = 0; i < remote_count; i++) {
1730 name = ls->ls_remove_names[i];
1731 len = ls->ls_remove_lens[i];
1733 spin_lock(&ls->ls_rsbtbl[b].lock);
1734 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1736 spin_unlock(&ls->ls_rsbtbl[b].lock);
1737 log_debug(ls, "remove_name not toss %s", name);
1741 if (r->res_master_nodeid != our_nodeid) {
1742 spin_unlock(&ls->ls_rsbtbl[b].lock);
1743 log_debug(ls, "remove_name master %d dir %d our %d %s",
1744 r->res_master_nodeid, r->res_dir_nodeid,
1749 if (r->res_dir_nodeid == our_nodeid) {
1750 /* should never happen */
1751 spin_unlock(&ls->ls_rsbtbl[b].lock);
1752 log_error(ls, "remove_name dir %d master %d our %d %s",
1753 r->res_dir_nodeid, r->res_master_nodeid,
1758 if (!time_after_eq(jiffies, r->res_toss_time +
1759 dlm_config.ci_toss_secs * HZ)) {
1760 spin_unlock(&ls->ls_rsbtbl[b].lock);
1761 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762 r->res_toss_time, jiffies, name);
1766 if (!kref_put(&r->res_ref, kill_rsb)) {
1767 spin_unlock(&ls->ls_rsbtbl[b].lock);
1768 log_error(ls, "remove_name in use %s", name);
1772 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1774 /* block lookup of same name until we've sent remove */
1775 spin_lock(&ls->ls_remove_spin);
1776 ls->ls_remove_len = len;
1777 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778 spin_unlock(&ls->ls_remove_spin);
1779 spin_unlock(&ls->ls_rsbtbl[b].lock);
1783 /* allow lookup of name again */
1784 spin_lock(&ls->ls_remove_spin);
1785 ls->ls_remove_len = 0;
1786 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787 spin_unlock(&ls->ls_remove_spin);
1793 void dlm_scan_rsbs(struct dlm_ls *ls)
1797 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1798 shrink_bucket(ls, i);
1799 if (dlm_locking_stopped(ls))
1805 static void add_timeout(struct dlm_lkb *lkb)
1807 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1809 if (is_master_copy(lkb))
1812 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1813 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1814 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1817 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1822 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1823 mutex_lock(&ls->ls_timeout_mutex);
1825 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1826 mutex_unlock(&ls->ls_timeout_mutex);
1829 static void del_timeout(struct dlm_lkb *lkb)
1831 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1833 mutex_lock(&ls->ls_timeout_mutex);
1834 if (!list_empty(&lkb->lkb_time_list)) {
1835 list_del_init(&lkb->lkb_time_list);
1838 mutex_unlock(&ls->ls_timeout_mutex);
1841 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1842 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1843 and then lock rsb because of lock ordering in add_timeout. We may need
1844 to specify some special timeout-related bits in the lkb that are just to
1845 be accessed under the timeout_mutex. */
1847 void dlm_scan_timeout(struct dlm_ls *ls)
1850 struct dlm_lkb *lkb;
1851 int do_cancel, do_warn;
1855 if (dlm_locking_stopped(ls))
1860 mutex_lock(&ls->ls_timeout_mutex);
1861 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1863 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1864 lkb->lkb_timestamp));
1866 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1867 wait_us >= (lkb->lkb_timeout_cs * 10000))
1870 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1871 wait_us >= dlm_config.ci_timewarn_cs * 10000)
1874 if (!do_cancel && !do_warn)
1879 mutex_unlock(&ls->ls_timeout_mutex);
1881 if (!do_cancel && !do_warn)
1884 r = lkb->lkb_resource;
1889 /* clear flag so we only warn once */
1890 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1891 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1893 dlm_timeout_warn(lkb);
1897 log_debug(ls, "timeout cancel %x node %d %s",
1898 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1899 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1900 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1902 _cancel_lock(r, lkb);
1911 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1912 dlm_recoverd before checking/setting ls_recover_begin. */
1914 void dlm_adjust_timeouts(struct dlm_ls *ls)
1916 struct dlm_lkb *lkb;
1917 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1919 ls->ls_recover_begin = 0;
1920 mutex_lock(&ls->ls_timeout_mutex);
1921 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1922 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1923 mutex_unlock(&ls->ls_timeout_mutex);
1925 if (!dlm_config.ci_waitwarn_us)
1928 mutex_lock(&ls->ls_waiters_mutex);
1929 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1930 if (ktime_to_us(lkb->lkb_wait_time))
1931 lkb->lkb_wait_time = ktime_get();
1933 mutex_unlock(&ls->ls_waiters_mutex);
1936 /* lkb is master or local copy */
1938 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1940 int b, len = r->res_ls->ls_lvblen;
1942 /* b=1 lvb returned to caller
1943 b=0 lvb written to rsb or invalidated
1946 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1949 if (!lkb->lkb_lvbptr)
1952 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1958 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1959 lkb->lkb_lvbseq = r->res_lvbseq;
1961 } else if (b == 0) {
1962 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1963 rsb_set_flag(r, RSB_VALNOTVALID);
1967 if (!lkb->lkb_lvbptr)
1970 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1974 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1979 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1981 lkb->lkb_lvbseq = r->res_lvbseq;
1982 rsb_clear_flag(r, RSB_VALNOTVALID);
1985 if (rsb_flag(r, RSB_VALNOTVALID))
1986 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1989 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1991 if (lkb->lkb_grmode < DLM_LOCK_PW)
1994 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1995 rsb_set_flag(r, RSB_VALNOTVALID);
1999 if (!lkb->lkb_lvbptr)
2002 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2006 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2011 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2013 rsb_clear_flag(r, RSB_VALNOTVALID);
2016 /* lkb is process copy (pc) */
2018 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2019 struct dlm_message *ms)
2023 if (!lkb->lkb_lvbptr)
2026 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2029 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2031 int len = receive_extralen(ms);
2032 if (len > DLM_RESNAME_MAXLEN)
2033 len = DLM_RESNAME_MAXLEN;
2034 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2035 lkb->lkb_lvbseq = ms->m_lvbseq;
2039 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2040 remove_lock -- used for unlock, removes lkb from granted
2041 revert_lock -- used for cancel, moves lkb from convert to granted
2042 grant_lock -- used for request and convert, adds lkb to granted or
2043 moves lkb from convert or waiting to granted
2045 Each of these is used for master or local copy lkb's. There is
2046 also a _pc() variation used to make the corresponding change on
2047 a process copy (pc) lkb. */
2049 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2052 lkb->lkb_grmode = DLM_LOCK_IV;
2053 /* this unhold undoes the original ref from create_lkb()
2054 so this leads to the lkb being freed */
2058 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2060 set_lvb_unlock(r, lkb);
2061 _remove_lock(r, lkb);
2064 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066 _remove_lock(r, lkb);
2069 /* returns: 0 did nothing
2070 1 moved lock to granted
2073 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2077 lkb->lkb_rqmode = DLM_LOCK_IV;
2079 switch (lkb->lkb_status) {
2080 case DLM_LKSTS_GRANTED:
2082 case DLM_LKSTS_CONVERT:
2083 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2086 case DLM_LKSTS_WAITING:
2088 lkb->lkb_grmode = DLM_LOCK_IV;
2089 /* this unhold undoes the original ref from create_lkb()
2090 so this leads to the lkb being freed */
2095 log_print("invalid status for revert %d", lkb->lkb_status);
2100 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2102 return revert_lock(r, lkb);
2105 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2107 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2108 lkb->lkb_grmode = lkb->lkb_rqmode;
2109 if (lkb->lkb_status)
2110 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2112 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2115 lkb->lkb_rqmode = DLM_LOCK_IV;
2116 lkb->lkb_highbast = 0;
2119 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2121 set_lvb_lock(r, lkb);
2122 _grant_lock(r, lkb);
2125 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126 struct dlm_message *ms)
2128 set_lvb_lock_pc(r, lkb, ms);
2129 _grant_lock(r, lkb);
2132 /* called by grant_pending_locks() which means an async grant message must
2133 be sent to the requesting node in addition to granting the lock if the
2134 lkb belongs to a remote node. */
2136 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2139 if (is_master_copy(lkb))
2142 queue_cast(r, lkb, 0);
2145 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2146 change the granted/requested modes. We're munging things accordingly in
2148 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2150 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2151 compatible with other granted locks */
2153 static void munge_demoted(struct dlm_lkb *lkb)
2155 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2156 log_print("munge_demoted %x invalid modes gr %d rq %d",
2157 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2161 lkb->lkb_grmode = DLM_LOCK_NL;
2164 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2166 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2167 ms->m_type != DLM_MSG_GRANT) {
2168 log_print("munge_altmode %x invalid reply type %d",
2169 lkb->lkb_id, ms->m_type);
2173 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2174 lkb->lkb_rqmode = DLM_LOCK_PR;
2175 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2176 lkb->lkb_rqmode = DLM_LOCK_CW;
2178 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2183 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2185 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2187 if (lkb->lkb_id == first->lkb_id)
2193 /* Check if the given lkb conflicts with another lkb on the queue. */
2195 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2197 struct dlm_lkb *this;
2199 list_for_each_entry(this, head, lkb_statequeue) {
2202 if (!modes_compat(this, lkb))
2209 * "A conversion deadlock arises with a pair of lock requests in the converting
2210 * queue for one resource. The granted mode of each lock blocks the requested
2211 * mode of the other lock."
2213 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2214 * convert queue from being granted, then deadlk/demote lkb.
2217 * Granted Queue: empty
2218 * Convert Queue: NL->EX (first lock)
2219 * PR->EX (second lock)
2221 * The first lock can't be granted because of the granted mode of the second
2222 * lock and the second lock can't be granted because it's not first in the
2223 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2224 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2225 * flag set and return DEMOTED in the lksb flags.
2227 * Originally, this function detected conv-deadlk in a more limited scope:
2228 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2229 * - if lkb1 was the first entry in the queue (not just earlier), and was
2230 * blocked by the granted mode of lkb2, and there was nothing on the
2231 * granted queue preventing lkb1 from being granted immediately, i.e.
2232 * lkb2 was the only thing preventing lkb1 from being granted.
2234 * That second condition meant we'd only say there was conv-deadlk if
2235 * resolving it (by demotion) would lead to the first lock on the convert
2236 * queue being granted right away. It allowed conversion deadlocks to exist
2237 * between locks on the convert queue while they couldn't be granted anyway.
2239 * Now, we detect and take action on conversion deadlocks immediately when
2240 * they're created, even if they may not be immediately consequential. If
2241 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2242 * mode that would prevent lkb1's conversion from being granted, we do a
2243 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2244 * I think this means that the lkb_is_ahead condition below should always
2245 * be zero, i.e. there will never be conv-deadlk between two locks that are
2246 * both already on the convert queue.
2249 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2251 struct dlm_lkb *lkb1;
2252 int lkb_is_ahead = 0;
2254 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2260 if (!lkb_is_ahead) {
2261 if (!modes_compat(lkb2, lkb1))
2264 if (!modes_compat(lkb2, lkb1) &&
2265 !modes_compat(lkb1, lkb2))
2273 * Return 1 if the lock can be granted, 0 otherwise.
2274 * Also detect and resolve conversion deadlocks.
2276 * lkb is the lock to be granted
2278 * now is 1 if the function is being called in the context of the
2279 * immediate request, it is 0 if called later, after the lock has been
2282 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2285 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
2287 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2290 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2291 * a new request for a NL mode lock being blocked.
2293 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2294 * request, then it would be granted. In essence, the use of this flag
2295 * tells the Lock Manager to expedite theis request by not considering
2296 * what may be in the CONVERTING or WAITING queues... As of this
2297 * writing, the EXPEDITE flag can be used only with new requests for NL
2298 * mode locks. This flag is not valid for conversion requests.
2300 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2301 * conversion or used with a non-NL requested mode. We also know an
2302 * EXPEDITE request is always granted immediately, so now must always
2303 * be 1. The full condition to grant an expedite request: (now &&
2304 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2305 * therefore be shortened to just checking the flag.
2308 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2312 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2313 * added to the remaining conditions.
2316 if (queue_conflict(&r->res_grantqueue, lkb))
2320 * 6-3: By default, a conversion request is immediately granted if the
2321 * requested mode is compatible with the modes of all other granted
2325 if (queue_conflict(&r->res_convertqueue, lkb))
2329 * 6-5: But the default algorithm for deciding whether to grant or
2330 * queue conversion requests does not by itself guarantee that such
2331 * requests are serviced on a "first come first serve" basis. This, in
2332 * turn, can lead to a phenomenon known as "indefinate postponement".
2334 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2335 * the system service employed to request a lock conversion. This flag
2336 * forces certain conversion requests to be queued, even if they are
2337 * compatible with the granted modes of other locks on the same
2338 * resource. Thus, the use of this flag results in conversion requests
2339 * being ordered on a "first come first servce" basis.
2341 * DCT: This condition is all about new conversions being able to occur
2342 * "in place" while the lock remains on the granted queue (assuming
2343 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2344 * doesn't _have_ to go onto the convert queue where it's processed in
2345 * order. The "now" variable is necessary to distinguish converts
2346 * being received and processed for the first time now, because once a
2347 * convert is moved to the conversion queue the condition below applies
2348 * requiring fifo granting.
2351 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2355 * Even if the convert is compat with all granted locks,
2356 * QUECVT forces it behind other locks on the convert queue.
2359 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2360 if (list_empty(&r->res_convertqueue))
2367 * The NOORDER flag is set to avoid the standard vms rules on grant
2371 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2375 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2376 * granted until all other conversion requests ahead of it are granted
2380 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2384 * 6-4: By default, a new request is immediately granted only if all
2385 * three of the following conditions are satisfied when the request is
2387 * - The queue of ungranted conversion requests for the resource is
2389 * - The queue of ungranted new requests for the resource is empty.
2390 * - The mode of the new request is compatible with the most
2391 * restrictive mode of all granted locks on the resource.
2394 if (now && !conv && list_empty(&r->res_convertqueue) &&
2395 list_empty(&r->res_waitqueue))
2399 * 6-4: Once a lock request is in the queue of ungranted new requests,
2400 * it cannot be granted until the queue of ungranted conversion
2401 * requests is empty, all ungranted new requests ahead of it are
2402 * granted and/or canceled, and it is compatible with the granted mode
2403 * of the most restrictive lock granted on the resource.
2406 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2407 first_in_list(lkb, &r->res_waitqueue))
2413 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2417 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2418 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2423 rv = _can_be_granted(r, lkb, now);
2428 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2429 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2430 * cancels one of the locks.
2433 if (is_convert && can_be_queued(lkb) &&
2434 conversion_deadlock_detect(r, lkb)) {
2435 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2436 lkb->lkb_grmode = DLM_LOCK_NL;
2437 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2438 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2442 log_print("can_be_granted deadlock %x now %d",
2451 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2452 * to grant a request in a mode other than the normal rqmode. It's a
2453 * simple way to provide a big optimization to applications that can
2457 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2459 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2463 lkb->lkb_rqmode = alt;
2464 rv = _can_be_granted(r, lkb, now);
2466 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2468 lkb->lkb_rqmode = rqmode;
2474 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2475 for locks pending on the convert list. Once verified (watch for these
2476 log_prints), we should be able to just call _can_be_granted() and not
2477 bother with the demote/deadlk cases here (and there's no easy way to deal
2478 with a deadlk here, we'd have to generate something like grant_lock with
2479 the deadlk error.) */
2481 /* Returns the highest requested mode of all blocked conversions; sets
2482 cw if there's a blocked conversion to DLM_LOCK_CW. */
2484 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2485 unsigned int *count)
2487 struct dlm_lkb *lkb, *s;
2488 int hi, demoted, quit, grant_restart, demote_restart;
2497 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2498 demoted = is_demoted(lkb);
2501 if (can_be_granted(r, lkb, 0, &deadlk)) {
2502 grant_lock_pending(r, lkb);
2509 if (!demoted && is_demoted(lkb)) {
2510 log_print("WARN: pending demoted %x node %d %s",
2511 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2517 log_print("WARN: pending deadlock %x node %d %s",
2518 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2523 hi = max_t(int, lkb->lkb_rqmode, hi);
2525 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2531 if (demote_restart && !quit) {
2536 return max_t(int, high, hi);
2539 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2540 unsigned int *count)
2542 struct dlm_lkb *lkb, *s;
2544 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2545 if (can_be_granted(r, lkb, 0, NULL)) {
2546 grant_lock_pending(r, lkb);
2550 high = max_t(int, lkb->lkb_rqmode, high);
2551 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2559 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2560 on either the convert or waiting queue.
2561 high is the largest rqmode of all locks blocked on the convert or
2564 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2566 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2567 if (gr->lkb_highbast < DLM_LOCK_EX)
2572 if (gr->lkb_highbast < high &&
2573 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2578 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2580 struct dlm_lkb *lkb, *s;
2581 int high = DLM_LOCK_IV;
2584 if (!is_master(r)) {
2585 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2590 high = grant_pending_convert(r, high, &cw, count);
2591 high = grant_pending_wait(r, high, &cw, count);
2593 if (high == DLM_LOCK_IV)
2597 * If there are locks left on the wait/convert queue then send blocking
2598 * ASTs to granted locks based on the largest requested mode (high)
2602 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2603 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2604 if (cw && high == DLM_LOCK_PR &&
2605 lkb->lkb_grmode == DLM_LOCK_PR)
2606 queue_bast(r, lkb, DLM_LOCK_CW);
2608 queue_bast(r, lkb, high);
2609 lkb->lkb_highbast = high;
2614 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2616 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2617 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2618 if (gr->lkb_highbast < DLM_LOCK_EX)
2623 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2628 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2629 struct dlm_lkb *lkb)
2633 list_for_each_entry(gr, head, lkb_statequeue) {
2634 /* skip self when sending basts to convertqueue */
2637 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2638 queue_bast(r, gr, lkb->lkb_rqmode);
2639 gr->lkb_highbast = lkb->lkb_rqmode;
2644 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2646 send_bast_queue(r, &r->res_grantqueue, lkb);
2649 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2651 send_bast_queue(r, &r->res_grantqueue, lkb);
2652 send_bast_queue(r, &r->res_convertqueue, lkb);
2655 /* set_master(r, lkb) -- set the master nodeid of a resource
2657 The purpose of this function is to set the nodeid field in the given
2658 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2659 known, it can just be copied to the lkb and the function will return
2660 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2661 before it can be copied to the lkb.
2663 When the rsb nodeid is being looked up remotely, the initial lkb
2664 causing the lookup is kept on the ls_waiters list waiting for the
2665 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2666 on the rsb's res_lookup list until the master is verified.
2669 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2670 1: the rsb master is not available and the lkb has been placed on
2674 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2676 int our_nodeid = dlm_our_nodeid();
2678 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2679 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2680 r->res_first_lkid = lkb->lkb_id;
2681 lkb->lkb_nodeid = r->res_nodeid;
2685 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2686 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2690 if (r->res_master_nodeid == our_nodeid) {
2691 lkb->lkb_nodeid = 0;
2695 if (r->res_master_nodeid) {
2696 lkb->lkb_nodeid = r->res_master_nodeid;
2700 if (dlm_dir_nodeid(r) == our_nodeid) {
2701 /* This is a somewhat unusual case; find_rsb will usually
2702 have set res_master_nodeid when dir nodeid is local, but
2703 there are cases where we become the dir node after we've
2704 past find_rsb and go through _request_lock again.
2705 confirm_master() or process_lookup_list() needs to be
2706 called after this. */
2707 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2708 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2710 r->res_master_nodeid = our_nodeid;
2712 lkb->lkb_nodeid = 0;
2716 wait_pending_remove(r);
2718 r->res_first_lkid = lkb->lkb_id;
2719 send_lookup(r, lkb);
2723 static void process_lookup_list(struct dlm_rsb *r)
2725 struct dlm_lkb *lkb, *safe;
2727 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2728 list_del_init(&lkb->lkb_rsb_lookup);
2729 _request_lock(r, lkb);
2734 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2736 static void confirm_master(struct dlm_rsb *r, int error)
2738 struct dlm_lkb *lkb;
2740 if (!r->res_first_lkid)
2746 r->res_first_lkid = 0;
2747 process_lookup_list(r);
2753 /* the remote request failed and won't be retried (it was
2754 a NOQUEUE, or has been canceled/unlocked); make a waiting
2755 lkb the first_lkid */
2757 r->res_first_lkid = 0;
2759 if (!list_empty(&r->res_lookup)) {
2760 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2762 list_del_init(&lkb->lkb_rsb_lookup);
2763 r->res_first_lkid = lkb->lkb_id;
2764 _request_lock(r, lkb);
2769 log_error(r->res_ls, "confirm_master unknown error %d", error);
2773 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2774 int namelen, unsigned long timeout_cs,
2775 void (*ast) (void *astparam),
2777 void (*bast) (void *astparam, int mode),
2778 struct dlm_args *args)
2782 /* check for invalid arg usage */
2784 if (mode < 0 || mode > DLM_LOCK_EX)
2787 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2790 if (flags & DLM_LKF_CANCEL)
2793 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2796 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2799 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2802 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2805 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2808 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2811 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2817 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2820 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2823 /* these args will be copied to the lkb in validate_lock_args,
2824 it cannot be done now because when converting locks, fields in
2825 an active lkb cannot be modified before locking the rsb */
2827 args->flags = flags;
2829 args->astparam = astparam;
2830 args->bastfn = bast;
2831 args->timeout = timeout_cs;
2839 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2841 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2842 DLM_LKF_FORCEUNLOCK))
2845 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2848 args->flags = flags;
2849 args->astparam = astarg;
2853 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2854 struct dlm_args *args)
2858 if (args->flags & DLM_LKF_CONVERT) {
2859 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2862 if (args->flags & DLM_LKF_QUECVT &&
2863 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2867 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2870 if (lkb->lkb_wait_type)
2873 if (is_overlap(lkb))
2877 lkb->lkb_exflags = args->flags;
2878 lkb->lkb_sbflags = 0;
2879 lkb->lkb_astfn = args->astfn;
2880 lkb->lkb_astparam = args->astparam;
2881 lkb->lkb_bastfn = args->bastfn;
2882 lkb->lkb_rqmode = args->mode;
2883 lkb->lkb_lksb = args->lksb;
2884 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2885 lkb->lkb_ownpid = (int) current->pid;
2886 lkb->lkb_timeout_cs = args->timeout;
2890 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2891 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2892 lkb->lkb_status, lkb->lkb_wait_type,
2893 lkb->lkb_resource->res_name);
2897 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2900 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2901 because there may be a lookup in progress and it's valid to do
2902 cancel/unlockf on it */
2904 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2906 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2909 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2910 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2915 /* an lkb may still exist even though the lock is EOL'ed due to a
2916 cancel, unlock or failed noqueue request; an app can't use these
2917 locks; return same error as if the lkid had not been found at all */
2919 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2920 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2925 /* an lkb may be waiting for an rsb lookup to complete where the
2926 lookup was initiated by another lock */
2928 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2929 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2930 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2931 list_del_init(&lkb->lkb_rsb_lookup);
2932 queue_cast(lkb->lkb_resource, lkb,
2933 args->flags & DLM_LKF_CANCEL ?
2934 -DLM_ECANCEL : -DLM_EUNLOCK);
2935 unhold_lkb(lkb); /* undoes create_lkb() */
2937 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2942 /* cancel not allowed with another cancel/unlock in progress */
2944 if (args->flags & DLM_LKF_CANCEL) {
2945 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2948 if (is_overlap(lkb))
2951 /* don't let scand try to do a cancel */
2954 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2955 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2960 /* there's nothing to cancel */
2961 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2962 !lkb->lkb_wait_type) {
2967 switch (lkb->lkb_wait_type) {
2968 case DLM_MSG_LOOKUP:
2969 case DLM_MSG_REQUEST:
2970 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2973 case DLM_MSG_UNLOCK:
2974 case DLM_MSG_CANCEL:
2977 /* add_to_waiters() will set OVERLAP_CANCEL */
2981 /* do we need to allow a force-unlock if there's a normal unlock
2982 already in progress? in what conditions could the normal unlock
2983 fail such that we'd want to send a force-unlock to be sure? */
2985 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2986 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2989 if (is_overlap_unlock(lkb))
2992 /* don't let scand try to do a cancel */
2995 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2996 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3001 switch (lkb->lkb_wait_type) {
3002 case DLM_MSG_LOOKUP:
3003 case DLM_MSG_REQUEST:
3004 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3007 case DLM_MSG_UNLOCK:
3010 /* add_to_waiters() will set OVERLAP_UNLOCK */
3014 /* normal unlock not allowed if there's any op in progress */
3016 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3020 /* an overlapping op shouldn't blow away exflags from other op */
3021 lkb->lkb_exflags |= args->flags;
3022 lkb->lkb_sbflags = 0;
3023 lkb->lkb_astparam = args->astparam;
3027 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3028 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3029 args->flags, lkb->lkb_wait_type,
3030 lkb->lkb_resource->res_name);
3035 * Four stage 4 varieties:
3036 * do_request(), do_convert(), do_unlock(), do_cancel()
3037 * These are called on the master node for the given lock and
3038 * from the central locking logic.
3041 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3045 if (can_be_granted(r, lkb, 1, NULL)) {
3047 queue_cast(r, lkb, 0);
3051 if (can_be_queued(lkb)) {
3052 error = -EINPROGRESS;
3053 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3059 queue_cast(r, lkb, -EAGAIN);
3064 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3069 if (force_blocking_asts(lkb))
3070 send_blocking_asts_all(r, lkb);
3073 send_blocking_asts(r, lkb);
3078 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3083 /* changing an existing lock may allow others to be granted */
3085 if (can_be_granted(r, lkb, 1, &deadlk)) {
3087 queue_cast(r, lkb, 0);
3091 /* can_be_granted() detected that this lock would block in a conversion
3092 deadlock, so we leave it on the granted queue and return EDEADLK in
3093 the ast for the convert. */
3096 /* it's left on the granted queue */
3097 revert_lock(r, lkb);
3098 queue_cast(r, lkb, -EDEADLK);
3103 /* is_demoted() means the can_be_granted() above set the grmode
3104 to NL, and left us on the granted queue. This auto-demotion
3105 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3106 now grantable. We have to try to grant other converting locks
3107 before we try again to grant this one. */
3109 if (is_demoted(lkb)) {
3110 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3111 if (_can_be_granted(r, lkb, 1)) {
3113 queue_cast(r, lkb, 0);
3116 /* else fall through and move to convert queue */
3119 if (can_be_queued(lkb)) {
3120 error = -EINPROGRESS;
3122 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3128 queue_cast(r, lkb, -EAGAIN);
3133 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3138 grant_pending_locks(r, NULL);
3139 /* grant_pending_locks also sends basts */
3142 if (force_blocking_asts(lkb))
3143 send_blocking_asts_all(r, lkb);
3146 send_blocking_asts(r, lkb);
3151 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3153 remove_lock(r, lkb);
3154 queue_cast(r, lkb, -DLM_EUNLOCK);
3155 return -DLM_EUNLOCK;
3158 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3161 grant_pending_locks(r, NULL);
3164 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3166 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3170 error = revert_lock(r, lkb);
3172 queue_cast(r, lkb, -DLM_ECANCEL);
3173 return -DLM_ECANCEL;
3178 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3182 grant_pending_locks(r, NULL);
3186 * Four stage 3 varieties:
3187 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3190 /* add a new lkb to a possibly new rsb, called by requesting process */
3192 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3196 /* set_master: sets lkb nodeid from r */
3198 error = set_master(r, lkb);
3207 /* receive_request() calls do_request() on remote node */
3208 error = send_request(r, lkb);
3210 error = do_request(r, lkb);
3211 /* for remote locks the request_reply is sent
3212 between do_request and do_request_effects */
3213 do_request_effects(r, lkb, error);
3219 /* change some property of an existing lkb, e.g. mode */
3221 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3226 /* receive_convert() calls do_convert() on remote node */
3227 error = send_convert(r, lkb);
3229 error = do_convert(r, lkb);
3230 /* for remote locks the convert_reply is sent
3231 between do_convert and do_convert_effects */
3232 do_convert_effects(r, lkb, error);
3238 /* remove an existing lkb from the granted queue */
3240 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3245 /* receive_unlock() calls do_unlock() on remote node */
3246 error = send_unlock(r, lkb);
3248 error = do_unlock(r, lkb);
3249 /* for remote locks the unlock_reply is sent
3250 between do_unlock and do_unlock_effects */
3251 do_unlock_effects(r, lkb, error);
3257 /* remove an existing lkb from the convert or wait queue */
3259 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3264 /* receive_cancel() calls do_cancel() on remote node */
3265 error = send_cancel(r, lkb);
3267 error = do_cancel(r, lkb);
3268 /* for remote locks the cancel_reply is sent
3269 between do_cancel and do_cancel_effects */
3270 do_cancel_effects(r, lkb, error);
3277 * Four stage 2 varieties:
3278 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3281 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3282 int len, struct dlm_args *args)
3287 error = validate_lock_args(ls, lkb, args);
3291 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3298 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3300 error = _request_lock(r, lkb);
3307 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3308 struct dlm_args *args)
3313 r = lkb->lkb_resource;
3318 error = validate_lock_args(ls, lkb, args);
3322 error = _convert_lock(r, lkb);
3329 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3330 struct dlm_args *args)
3335 r = lkb->lkb_resource;
3340 error = validate_unlock_args(lkb, args);
3344 error = _unlock_lock(r, lkb);
3351 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3352 struct dlm_args *args)
3357 r = lkb->lkb_resource;
3362 error = validate_unlock_args(lkb, args);
3366 error = _cancel_lock(r, lkb);
3374 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3377 int dlm_lock(dlm_lockspace_t *lockspace,
3379 struct dlm_lksb *lksb,
3382 unsigned int namelen,
3383 uint32_t parent_lkid,
3384 void (*ast) (void *astarg),
3386 void (*bast) (void *astarg, int mode))
3389 struct dlm_lkb *lkb;
3390 struct dlm_args args;
3391 int error, convert = flags & DLM_LKF_CONVERT;
3393 ls = dlm_find_lockspace_local(lockspace);
3397 dlm_lock_recovery(ls);
3400 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3402 error = create_lkb(ls, &lkb);
3407 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3408 astarg, bast, &args);
3413 error = convert_lock(ls, lkb, &args);
3415 error = request_lock(ls, lkb, name, namelen, &args);
3417 if (error == -EINPROGRESS)
3420 if (convert || error)
3422 if (error == -EAGAIN || error == -EDEADLK)
3425 dlm_unlock_recovery(ls);
3426 dlm_put_lockspace(ls);
3430 int dlm_unlock(dlm_lockspace_t *lockspace,
3433 struct dlm_lksb *lksb,
3437 struct dlm_lkb *lkb;
3438 struct dlm_args args;
3441 ls = dlm_find_lockspace_local(lockspace);
3445 dlm_lock_recovery(ls);
3447 error = find_lkb(ls, lkid, &lkb);
3451 error = set_unlock_args(flags, astarg, &args);
3455 if (flags & DLM_LKF_CANCEL)
3456 error = cancel_lock(ls, lkb, &args);
3458 error = unlock_lock(ls, lkb, &args);
3460 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3462 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3467 dlm_unlock_recovery(ls);
3468 dlm_put_lockspace(ls);
3473 * send/receive routines for remote operations and replies
3477 * send_request receive_request
3478 * send_convert receive_convert
3479 * send_unlock receive_unlock
3480 * send_cancel receive_cancel
3481 * send_grant receive_grant
3482 * send_bast receive_bast
3483 * send_lookup receive_lookup
3484 * send_remove receive_remove
3487 * receive_request_reply send_request_reply
3488 * receive_convert_reply send_convert_reply
3489 * receive_unlock_reply send_unlock_reply
3490 * receive_cancel_reply send_cancel_reply
3491 * receive_lookup_reply send_lookup_reply
3494 static int _create_message(struct dlm_ls *ls, int mb_len,
3495 int to_nodeid, int mstype,
3496 struct dlm_message **ms_ret,
3497 struct dlm_mhandle **mh_ret)
3499 struct dlm_message *ms;
3500 struct dlm_mhandle *mh;
3503 /* get_buffer gives us a message handle (mh) that we need to
3504 pass into lowcomms_commit and a message buffer (mb) that we
3505 write our data into */
3507 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3511 memset(mb, 0, mb_len);
3513 ms = (struct dlm_message *) mb;
3515 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3516 ms->m_header.h_lockspace = ls->ls_global_id;
3517 ms->m_header.h_nodeid = dlm_our_nodeid();
3518 ms->m_header.h_length = mb_len;
3519 ms->m_header.h_cmd = DLM_MSG;
3521 ms->m_type = mstype;
3528 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3529 int to_nodeid, int mstype,
3530 struct dlm_message **ms_ret,
3531 struct dlm_mhandle **mh_ret)
3533 int mb_len = sizeof(struct dlm_message);
3536 case DLM_MSG_REQUEST:
3537 case DLM_MSG_LOOKUP:
3538 case DLM_MSG_REMOVE:
3539 mb_len += r->res_length;
3541 case DLM_MSG_CONVERT:
3542 case DLM_MSG_UNLOCK:
3543 case DLM_MSG_REQUEST_REPLY:
3544 case DLM_MSG_CONVERT_REPLY:
3546 if (lkb && lkb->lkb_lvbptr)
3547 mb_len += r->res_ls->ls_lvblen;
3551 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3555 /* further lowcomms enhancements or alternate implementations may make
3556 the return value from this function useful at some point */
3558 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3560 dlm_message_out(ms);
3561 dlm_lowcomms_commit_buffer(mh);
3565 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3566 struct dlm_message *ms)
3568 ms->m_nodeid = lkb->lkb_nodeid;
3569 ms->m_pid = lkb->lkb_ownpid;
3570 ms->m_lkid = lkb->lkb_id;
3571 ms->m_remid = lkb->lkb_remid;
3572 ms->m_exflags = lkb->lkb_exflags;
3573 ms->m_sbflags = lkb->lkb_sbflags;
3574 ms->m_flags = lkb->lkb_flags;
3575 ms->m_lvbseq = lkb->lkb_lvbseq;
3576 ms->m_status = lkb->lkb_status;
3577 ms->m_grmode = lkb->lkb_grmode;
3578 ms->m_rqmode = lkb->lkb_rqmode;
3579 ms->m_hash = r->res_hash;
3581 /* m_result and m_bastmode are set from function args,
3582 not from lkb fields */
3584 if (lkb->lkb_bastfn)
3585 ms->m_asts |= DLM_CB_BAST;
3587 ms->m_asts |= DLM_CB_CAST;
3589 /* compare with switch in create_message; send_remove() doesn't
3592 switch (ms->m_type) {
3593 case DLM_MSG_REQUEST:
3594 case DLM_MSG_LOOKUP:
3595 memcpy(ms->m_extra, r->res_name, r->res_length);
3597 case DLM_MSG_CONVERT:
3598 case DLM_MSG_UNLOCK:
3599 case DLM_MSG_REQUEST_REPLY:
3600 case DLM_MSG_CONVERT_REPLY:
3602 if (!lkb->lkb_lvbptr)
3604 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3609 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3611 struct dlm_message *ms;
3612 struct dlm_mhandle *mh;
3613 int to_nodeid, error;
3615 to_nodeid = r->res_nodeid;
3617 error = add_to_waiters(lkb, mstype, to_nodeid);
3621 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3625 send_args(r, lkb, ms);
3627 error = send_message(mh, ms);
3633 remove_from_waiters(lkb, msg_reply_type(mstype));
3637 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3639 return send_common(r, lkb, DLM_MSG_REQUEST);
3642 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3646 error = send_common(r, lkb, DLM_MSG_CONVERT);
3648 /* down conversions go without a reply from the master */
3649 if (!error && down_conversion(lkb)) {
3650 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3651 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3652 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3653 r->res_ls->ls_stub_ms.m_result = 0;
3654 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3660 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3661 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3662 that the master is still correct. */
3664 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3666 return send_common(r, lkb, DLM_MSG_UNLOCK);
3669 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3671 return send_common(r, lkb, DLM_MSG_CANCEL);
3674 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3676 struct dlm_message *ms;
3677 struct dlm_mhandle *mh;
3678 int to_nodeid, error;
3680 to_nodeid = lkb->lkb_nodeid;
3682 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3686 send_args(r, lkb, ms);
3690 error = send_message(mh, ms);
3695 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3697 struct dlm_message *ms;
3698 struct dlm_mhandle *mh;
3699 int to_nodeid, error;
3701 to_nodeid = lkb->lkb_nodeid;
3703 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3707 send_args(r, lkb, ms);
3709 ms->m_bastmode = mode;
3711 error = send_message(mh, ms);
3716 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3718 struct dlm_message *ms;
3719 struct dlm_mhandle *mh;
3720 int to_nodeid, error;
3722 to_nodeid = dlm_dir_nodeid(r);
3724 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3728 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3732 send_args(r, lkb, ms);
3734 error = send_message(mh, ms);
3740 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3744 static int send_remove(struct dlm_rsb *r)
3746 struct dlm_message *ms;
3747 struct dlm_mhandle *mh;
3748 int to_nodeid, error;
3750 to_nodeid = dlm_dir_nodeid(r);
3752 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3756 memcpy(ms->m_extra, r->res_name, r->res_length);
3757 ms->m_hash = r->res_hash;
3759 error = send_message(mh, ms);
3764 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3767 struct dlm_message *ms;
3768 struct dlm_mhandle *mh;
3769 int to_nodeid, error;
3771 to_nodeid = lkb->lkb_nodeid;
3773 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3777 send_args(r, lkb, ms);
3781 error = send_message(mh, ms);
3786 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3788 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3791 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3793 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3796 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3798 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3801 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3803 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3806 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3807 int ret_nodeid, int rv)
3809 struct dlm_rsb *r = &ls->ls_stub_rsb;
3810 struct dlm_message *ms;
3811 struct dlm_mhandle *mh;
3812 int error, nodeid = ms_in->m_header.h_nodeid;
3814 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3818 ms->m_lkid = ms_in->m_lkid;
3820 ms->m_nodeid = ret_nodeid;
3822 error = send_message(mh, ms);
3827 /* which args we save from a received message depends heavily on the type
3828 of message, unlike the send side where we can safely send everything about
3829 the lkb for any type of message */
3831 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3833 lkb->lkb_exflags = ms->m_exflags;
3834 lkb->lkb_sbflags = ms->m_sbflags;
3835 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3836 (ms->m_flags & 0x0000FFFF);
3839 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3841 if (ms->m_flags == DLM_IFL_STUB_MS)
3844 lkb->lkb_sbflags = ms->m_sbflags;
3845 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3846 (ms->m_flags & 0x0000FFFF);
3849 static int receive_extralen(struct dlm_message *ms)
3851 return (ms->m_header.h_length - sizeof(struct dlm_message));
3854 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3855 struct dlm_message *ms)
3859 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3860 if (!lkb->lkb_lvbptr)
3861 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3862 if (!lkb->lkb_lvbptr)
3864 len = receive_extralen(ms);
3865 if (len > DLM_RESNAME_MAXLEN)
3866 len = DLM_RESNAME_MAXLEN;
3867 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3872 static void fake_bastfn(void *astparam, int mode)
3874 log_print("fake_bastfn should not be called");
3877 static void fake_astfn(void *astparam)
3879 log_print("fake_astfn should not be called");
3882 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3883 struct dlm_message *ms)
3885 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3886 lkb->lkb_ownpid = ms->m_pid;
3887 lkb->lkb_remid = ms->m_lkid;
3888 lkb->lkb_grmode = DLM_LOCK_IV;
3889 lkb->lkb_rqmode = ms->m_rqmode;
3891 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3892 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3894 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3895 /* lkb was just created so there won't be an lvb yet */
3896 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3897 if (!lkb->lkb_lvbptr)
3904 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3905 struct dlm_message *ms)
3907 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3910 if (receive_lvb(ls, lkb, ms))
3913 lkb->lkb_rqmode = ms->m_rqmode;
3914 lkb->lkb_lvbseq = ms->m_lvbseq;
3919 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3920 struct dlm_message *ms)
3922 if (receive_lvb(ls, lkb, ms))
3927 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3928 uses to send a reply and that the remote end uses to process the reply. */
3930 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3932 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3933 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3934 lkb->lkb_remid = ms->m_lkid;
3937 /* This is called after the rsb is locked so that we can safely inspect
3938 fields in the lkb. */
3940 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3942 int from = ms->m_header.h_nodeid;
3945 switch (ms->m_type) {
3946 case DLM_MSG_CONVERT:
3947 case DLM_MSG_UNLOCK:
3948 case DLM_MSG_CANCEL:
3949 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3953 case DLM_MSG_CONVERT_REPLY:
3954 case DLM_MSG_UNLOCK_REPLY:
3955 case DLM_MSG_CANCEL_REPLY:
3958 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3962 case DLM_MSG_REQUEST_REPLY:
3963 if (!is_process_copy(lkb))
3965 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3974 log_error(lkb->lkb_resource->res_ls,
3975 "ignore invalid message %d from %d %x %x %x %d",
3976 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3977 lkb->lkb_flags, lkb->lkb_nodeid);
3981 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3983 struct dlm_lkb *lkb;
3988 from_nodeid = ms->m_header.h_nodeid;
3990 error = create_lkb(ls, &lkb);
3994 receive_flags(lkb, ms);
3995 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3996 error = receive_request_args(ls, lkb, ms);
4002 /* The dir node is the authority on whether we are the master
4003 for this rsb or not, so if the master sends us a request, we should
4004 recreate the rsb if we've destroyed it. This race happens when we
4005 send a remove message to the dir node at the same time that the dir
4006 node sends us a request for the rsb. */
4008 namelen = receive_extralen(ms);
4010 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4011 R_RECEIVE_REQUEST, &r);
4019 if (r->res_master_nodeid != dlm_our_nodeid()) {
4020 error = validate_master_nodeid(ls, r, from_nodeid);
4030 error = do_request(r, lkb);
4031 send_request_reply(r, lkb, error);
4032 do_request_effects(r, lkb, error);
4037 if (error == -EINPROGRESS)
4044 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4045 and do this receive_request again from process_lookup_list once
4046 we get the lookup reply. This would avoid a many repeated
4047 ENOTBLK request failures when the lookup reply designating us
4048 as master is delayed. */
4050 /* We could repeatedly return -EBADR here if our send_remove() is
4051 delayed in being sent/arriving/being processed on the dir node.
4052 Another node would repeatedly lookup up the master, and the dir
4053 node would continue returning our nodeid until our send_remove
4056 if (error != -ENOTBLK) {
4057 log_limit(ls, "receive_request %x from %d %d",
4058 ms->m_lkid, from_nodeid, error);
4061 setup_stub_lkb(ls, ms);
4062 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4066 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4068 struct dlm_lkb *lkb;
4070 int error, reply = 1;
4072 error = find_lkb(ls, ms->m_remid, &lkb);
4076 if (lkb->lkb_remid != ms->m_lkid) {
4077 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4078 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4079 (unsigned long long)lkb->lkb_recover_seq,
4080 ms->m_header.h_nodeid, ms->m_lkid);
4085 r = lkb->lkb_resource;
4090 error = validate_message(lkb, ms);
4094 receive_flags(lkb, ms);
4096 error = receive_convert_args(ls, lkb, ms);
4098 send_convert_reply(r, lkb, error);
4102 reply = !down_conversion(lkb);
4104 error = do_convert(r, lkb);
4106 send_convert_reply(r, lkb, error);
4107 do_convert_effects(r, lkb, error);
4115 setup_stub_lkb(ls, ms);
4116 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4120 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4122 struct dlm_lkb *lkb;
4126 error = find_lkb(ls, ms->m_remid, &lkb);
4130 if (lkb->lkb_remid != ms->m_lkid) {
4131 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4132 lkb->lkb_id, lkb->lkb_remid,
4133 ms->m_header.h_nodeid, ms->m_lkid);
4138 r = lkb->lkb_resource;
4143 error = validate_message(lkb, ms);
4147 receive_flags(lkb, ms);
4149 error = receive_unlock_args(ls, lkb, ms);
4151 send_unlock_reply(r, lkb, error);
4155 error = do_unlock(r, lkb);
4156 send_unlock_reply(r, lkb, error);
4157 do_unlock_effects(r, lkb, error);
4165 setup_stub_lkb(ls, ms);
4166 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4170 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4172 struct dlm_lkb *lkb;
4176 error = find_lkb(ls, ms->m_remid, &lkb);
4180 receive_flags(lkb, ms);
4182 r = lkb->lkb_resource;
4187 error = validate_message(lkb, ms);
4191 error = do_cancel(r, lkb);
4192 send_cancel_reply(r, lkb, error);
4193 do_cancel_effects(r, lkb, error);
4201 setup_stub_lkb(ls, ms);
4202 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4206 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4208 struct dlm_lkb *lkb;
4212 error = find_lkb(ls, ms->m_remid, &lkb);
4216 r = lkb->lkb_resource;
4221 error = validate_message(lkb, ms);
4225 receive_flags_reply(lkb, ms);
4226 if (is_altmode(lkb))
4227 munge_altmode(lkb, ms);
4228 grant_lock_pc(r, lkb, ms);
4229 queue_cast(r, lkb, 0);
4237 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4239 struct dlm_lkb *lkb;
4243 error = find_lkb(ls, ms->m_remid, &lkb);
4247 r = lkb->lkb_resource;
4252 error = validate_message(lkb, ms);
4256 queue_bast(r, lkb, ms->m_bastmode);
4257 lkb->lkb_highbast = ms->m_bastmode;
4265 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4267 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4269 from_nodeid = ms->m_header.h_nodeid;
4270 our_nodeid = dlm_our_nodeid();
4272 len = receive_extralen(ms);
4274 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4277 /* Optimization: we're master so treat lookup as a request */
4278 if (!error && ret_nodeid == our_nodeid) {
4279 receive_request(ls, ms);
4282 send_lookup_reply(ls, ms, ret_nodeid, error);
4285 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4287 char name[DLM_RESNAME_MAXLEN+1];
4290 int rv, len, dir_nodeid, from_nodeid;
4292 from_nodeid = ms->m_header.h_nodeid;
4294 len = receive_extralen(ms);
4296 if (len > DLM_RESNAME_MAXLEN) {
4297 log_error(ls, "receive_remove from %d bad len %d",
4302 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4303 if (dir_nodeid != dlm_our_nodeid()) {
4304 log_error(ls, "receive_remove from %d bad nodeid %d",
4305 from_nodeid, dir_nodeid);
4309 /* Look for name on rsbtbl.toss, if it's there, kill it.
4310 If it's on rsbtbl.keep, it's being used, and we should ignore this
4311 message. This is an expected race between the dir node sending a
4312 request to the master node at the same time as the master node sends
4313 a remove to the dir node. The resolution to that race is for the
4314 dir node to ignore the remove message, and the master node to
4315 recreate the master rsb when it gets a request from the dir node for
4316 an rsb it doesn't have. */
4318 memset(name, 0, sizeof(name));
4319 memcpy(name, ms->m_extra, len);
4321 hash = jhash(name, len, 0);
4322 b = hash & (ls->ls_rsbtbl_size - 1);
4324 spin_lock(&ls->ls_rsbtbl[b].lock);
4326 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4328 /* verify the rsb is on keep list per comment above */
4329 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4331 /* should not happen */
4332 log_error(ls, "receive_remove from %d not found %s",
4334 spin_unlock(&ls->ls_rsbtbl[b].lock);
4337 if (r->res_master_nodeid != from_nodeid) {
4338 /* should not happen */
4339 log_error(ls, "receive_remove keep from %d master %d",
4340 from_nodeid, r->res_master_nodeid);
4342 spin_unlock(&ls->ls_rsbtbl[b].lock);
4346 log_debug(ls, "receive_remove from %d master %d first %x %s",
4347 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4349 spin_unlock(&ls->ls_rsbtbl[b].lock);
4353 if (r->res_master_nodeid != from_nodeid) {
4354 log_error(ls, "receive_remove toss from %d master %d",
4355 from_nodeid, r->res_master_nodeid);
4357 spin_unlock(&ls->ls_rsbtbl[b].lock);
4361 if (kref_put(&r->res_ref, kill_rsb)) {
4362 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4363 spin_unlock(&ls->ls_rsbtbl[b].lock);
4366 log_error(ls, "receive_remove from %d rsb ref error",
4369 spin_unlock(&ls->ls_rsbtbl[b].lock);
4373 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4375 do_purge(ls, ms->m_nodeid, ms->m_pid);
4378 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4380 struct dlm_lkb *lkb;
4382 int error, mstype, result;
4383 int from_nodeid = ms->m_header.h_nodeid;
4385 error = find_lkb(ls, ms->m_remid, &lkb);
4389 r = lkb->lkb_resource;
4393 error = validate_message(lkb, ms);
4397 mstype = lkb->lkb_wait_type;
4398 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4400 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4401 lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4406 /* Optimization: the dir node was also the master, so it took our
4407 lookup as a request and sent request reply instead of lookup reply */
4408 if (mstype == DLM_MSG_LOOKUP) {
4409 r->res_master_nodeid = from_nodeid;
4410 r->res_nodeid = from_nodeid;
4411 lkb->lkb_nodeid = from_nodeid;
4414 /* this is the value returned from do_request() on the master */
4415 result = ms->m_result;
4419 /* request would block (be queued) on remote master */
4420 queue_cast(r, lkb, -EAGAIN);
4421 confirm_master(r, -EAGAIN);
4422 unhold_lkb(lkb); /* undoes create_lkb() */
4427 /* request was queued or granted on remote master */
4428 receive_flags_reply(lkb, ms);
4429 lkb->lkb_remid = ms->m_lkid;
4430 if (is_altmode(lkb))
4431 munge_altmode(lkb, ms);
4433 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4436 grant_lock_pc(r, lkb, ms);
4437 queue_cast(r, lkb, 0);
4439 confirm_master(r, result);
4444 /* find_rsb failed to find rsb or rsb wasn't master */
4445 log_limit(ls, "receive_request_reply %x from %d %d "
4446 "master %d dir %d first %x %s", lkb->lkb_id,
4447 from_nodeid, result, r->res_master_nodeid,
4448 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4450 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4451 r->res_master_nodeid != dlm_our_nodeid()) {
4452 /* cause _request_lock->set_master->send_lookup */
4453 r->res_master_nodeid = 0;
4455 lkb->lkb_nodeid = -1;
4458 if (is_overlap(lkb)) {
4459 /* we'll ignore error in cancel/unlock reply */
4460 queue_cast_overlap(r, lkb);
4461 confirm_master(r, result);
4462 unhold_lkb(lkb); /* undoes create_lkb() */
4464 _request_lock(r, lkb);
4466 if (r->res_master_nodeid == dlm_our_nodeid())
4467 confirm_master(r, 0);
4472 log_error(ls, "receive_request_reply %x error %d",
4473 lkb->lkb_id, result);
4476 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4477 log_debug(ls, "receive_request_reply %x result %d unlock",
4478 lkb->lkb_id, result);
4479 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4480 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4481 send_unlock(r, lkb);
4482 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4483 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4484 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4485 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4486 send_cancel(r, lkb);
4488 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4489 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4498 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4499 struct dlm_message *ms)
4501 /* this is the value returned from do_convert() on the master */
4502 switch (ms->m_result) {
4504 /* convert would block (be queued) on remote master */
4505 queue_cast(r, lkb, -EAGAIN);
4509 receive_flags_reply(lkb, ms);
4510 revert_lock_pc(r, lkb);
4511 queue_cast(r, lkb, -EDEADLK);
4515 /* convert was queued on remote master */
4516 receive_flags_reply(lkb, ms);
4517 if (is_demoted(lkb))
4520 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4525 /* convert was granted on remote master */
4526 receive_flags_reply(lkb, ms);
4527 if (is_demoted(lkb))
4529 grant_lock_pc(r, lkb, ms);
4530 queue_cast(r, lkb, 0);
4534 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4535 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4542 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4544 struct dlm_rsb *r = lkb->lkb_resource;
4550 error = validate_message(lkb, ms);
4554 /* stub reply can happen with waiters_mutex held */
4555 error = remove_from_waiters_ms(lkb, ms);
4559 __receive_convert_reply(r, lkb, ms);
4565 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4567 struct dlm_lkb *lkb;
4570 error = find_lkb(ls, ms->m_remid, &lkb);
4574 _receive_convert_reply(lkb, ms);
4579 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4581 struct dlm_rsb *r = lkb->lkb_resource;
4587 error = validate_message(lkb, ms);
4591 /* stub reply can happen with waiters_mutex held */
4592 error = remove_from_waiters_ms(lkb, ms);
4596 /* this is the value returned from do_unlock() on the master */
4598 switch (ms->m_result) {
4600 receive_flags_reply(lkb, ms);
4601 remove_lock_pc(r, lkb);
4602 queue_cast(r, lkb, -DLM_EUNLOCK);
4607 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4608 lkb->lkb_id, ms->m_result);
4615 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4617 struct dlm_lkb *lkb;
4620 error = find_lkb(ls, ms->m_remid, &lkb);
4624 _receive_unlock_reply(lkb, ms);
4629 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4631 struct dlm_rsb *r = lkb->lkb_resource;
4637 error = validate_message(lkb, ms);
4641 /* stub reply can happen with waiters_mutex held */
4642 error = remove_from_waiters_ms(lkb, ms);
4646 /* this is the value returned from do_cancel() on the master */
4648 switch (ms->m_result) {
4650 receive_flags_reply(lkb, ms);
4651 revert_lock_pc(r, lkb);
4652 queue_cast(r, lkb, -DLM_ECANCEL);
4657 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4658 lkb->lkb_id, ms->m_result);
4665 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4667 struct dlm_lkb *lkb;
4670 error = find_lkb(ls, ms->m_remid, &lkb);
4674 _receive_cancel_reply(lkb, ms);
4679 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4681 struct dlm_lkb *lkb;
4683 int error, ret_nodeid;
4684 int do_lookup_list = 0;
4686 error = find_lkb(ls, ms->m_lkid, &lkb);
4688 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4692 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4693 FIXME: will a non-zero error ever be returned? */
4695 r = lkb->lkb_resource;
4699 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4703 ret_nodeid = ms->m_nodeid;
4705 /* We sometimes receive a request from the dir node for this
4706 rsb before we've received the dir node's loookup_reply for it.
4707 The request from the dir node implies we're the master, so we set
4708 ourself as master in receive_request_reply, and verify here that
4709 we are indeed the master. */
4711 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4712 /* This should never happen */
4713 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4714 "master %d dir %d our %d first %x %s",
4715 lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4716 r->res_master_nodeid, r->res_dir_nodeid,
4717 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4720 if (ret_nodeid == dlm_our_nodeid()) {
4721 r->res_master_nodeid = ret_nodeid;
4724 r->res_first_lkid = 0;
4725 } else if (ret_nodeid == -1) {
4726 /* the remote node doesn't believe it's the dir node */
4727 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4728 lkb->lkb_id, ms->m_header.h_nodeid);
4729 r->res_master_nodeid = 0;
4731 lkb->lkb_nodeid = -1;
4733 /* set_master() will set lkb_nodeid from r */
4734 r->res_master_nodeid = ret_nodeid;
4735 r->res_nodeid = ret_nodeid;
4738 if (is_overlap(lkb)) {
4739 log_debug(ls, "receive_lookup_reply %x unlock %x",
4740 lkb->lkb_id, lkb->lkb_flags);
4741 queue_cast_overlap(r, lkb);
4742 unhold_lkb(lkb); /* undoes create_lkb() */
4746 _request_lock(r, lkb);
4750 process_lookup_list(r);
4757 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4760 int error = 0, noent = 0;
4762 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4763 log_limit(ls, "receive %d from non-member %d %x %x %d",
4764 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4765 ms->m_remid, ms->m_result);
4769 switch (ms->m_type) {
4771 /* messages sent to a master node */
4773 case DLM_MSG_REQUEST:
4774 error = receive_request(ls, ms);
4777 case DLM_MSG_CONVERT:
4778 error = receive_convert(ls, ms);
4781 case DLM_MSG_UNLOCK:
4782 error = receive_unlock(ls, ms);
4785 case DLM_MSG_CANCEL:
4787 error = receive_cancel(ls, ms);
4790 /* messages sent from a master node (replies to above) */
4792 case DLM_MSG_REQUEST_REPLY:
4793 error = receive_request_reply(ls, ms);
4796 case DLM_MSG_CONVERT_REPLY:
4797 error = receive_convert_reply(ls, ms);
4800 case DLM_MSG_UNLOCK_REPLY:
4801 error = receive_unlock_reply(ls, ms);
4804 case DLM_MSG_CANCEL_REPLY:
4805 error = receive_cancel_reply(ls, ms);
4808 /* messages sent from a master node (only two types of async msg) */
4812 error = receive_grant(ls, ms);
4817 error = receive_bast(ls, ms);
4820 /* messages sent to a dir node */
4822 case DLM_MSG_LOOKUP:
4823 receive_lookup(ls, ms);
4826 case DLM_MSG_REMOVE:
4827 receive_remove(ls, ms);
4830 /* messages sent from a dir node (remove has no reply) */
4832 case DLM_MSG_LOOKUP_REPLY:
4833 receive_lookup_reply(ls, ms);
4836 /* other messages */
4839 receive_purge(ls, ms);
4843 log_error(ls, "unknown message type %d", ms->m_type);
4847 * When checking for ENOENT, we're checking the result of
4848 * find_lkb(m_remid):
4850 * The lock id referenced in the message wasn't found. This may
4851 * happen in normal usage for the async messages and cancel, so
4852 * only use log_debug for them.
4854 * Some errors are expected and normal.
4857 if (error == -ENOENT && noent) {
4858 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4859 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4860 ms->m_lkid, saved_seq);
4861 } else if (error == -ENOENT) {
4862 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4863 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4864 ms->m_lkid, saved_seq);
4866 if (ms->m_type == DLM_MSG_CONVERT)
4867 dlm_dump_rsb_hash(ls, ms->m_hash);
4870 if (error == -EINVAL) {
4871 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4873 ms->m_type, ms->m_header.h_nodeid,
4874 ms->m_lkid, ms->m_remid, saved_seq);
4878 /* If the lockspace is in recovery mode (locking stopped), then normal
4879 messages are saved on the requestqueue for processing after recovery is
4880 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4881 messages off the requestqueue before we process new ones. This occurs right
4882 after recovery completes when we transition from saving all messages on
4883 requestqueue, to processing all the saved messages, to processing new
4884 messages as they arrive. */
4886 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4889 if (dlm_locking_stopped(ls)) {
4890 /* If we were a member of this lockspace, left, and rejoined,
4891 other nodes may still be sending us messages from the
4892 lockspace generation before we left. */
4893 if (!ls->ls_generation) {
4894 log_limit(ls, "receive %d from %d ignore old gen",
4895 ms->m_type, nodeid);
4899 dlm_add_requestqueue(ls, nodeid, ms);
4901 dlm_wait_requestqueue(ls);
4902 _receive_message(ls, ms, 0);
4906 /* This is called by dlm_recoverd to process messages that were saved on
4907 the requestqueue. */
4909 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4912 _receive_message(ls, ms, saved_seq);
4915 /* This is called by the midcomms layer when something is received for
4916 the lockspace. It could be either a MSG (normal message sent as part of
4917 standard locking activity) or an RCOM (recovery message sent as part of
4918 lockspace recovery). */
4920 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4922 struct dlm_header *hd = &p->header;
4926 switch (hd->h_cmd) {
4928 dlm_message_in(&p->message);
4929 type = p->message.m_type;
4932 dlm_rcom_in(&p->rcom);
4933 type = p->rcom.rc_type;
4936 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4940 if (hd->h_nodeid != nodeid) {
4941 log_print("invalid h_nodeid %d from %d lockspace %x",
4942 hd->h_nodeid, nodeid, hd->h_lockspace);
4946 ls = dlm_find_lockspace_global(hd->h_lockspace);
4948 if (dlm_config.ci_log_debug) {
4949 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4950 "%u from %d cmd %d type %d\n",
4951 hd->h_lockspace, nodeid, hd->h_cmd, type);
4954 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4955 dlm_send_ls_not_ready(nodeid, &p->rcom);
4959 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4960 be inactive (in this ls) before transitioning to recovery mode */
4962 down_read(&ls->ls_recv_active);
4963 if (hd->h_cmd == DLM_MSG)
4964 dlm_receive_message(ls, &p->message, nodeid);
4966 dlm_receive_rcom(ls, &p->rcom, nodeid);
4967 up_read(&ls->ls_recv_active);
4969 dlm_put_lockspace(ls);
4972 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4973 struct dlm_message *ms_stub)
4975 if (middle_conversion(lkb)) {
4977 memset(ms_stub, 0, sizeof(struct dlm_message));
4978 ms_stub->m_flags = DLM_IFL_STUB_MS;
4979 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4980 ms_stub->m_result = -EINPROGRESS;
4981 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4982 _receive_convert_reply(lkb, ms_stub);
4984 /* Same special case as in receive_rcom_lock_args() */
4985 lkb->lkb_grmode = DLM_LOCK_IV;
4986 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4989 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4990 lkb->lkb_flags |= DLM_IFL_RESEND;
4993 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4994 conversions are async; there's no reply from the remote master */
4997 /* A waiting lkb needs recovery if the master node has failed, or
4998 the master node is changing (only when no directory is used) */
5000 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5003 if (dlm_no_directory(ls))
5006 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5012 /* Recovery for locks that are waiting for replies from nodes that are now
5013 gone. We can just complete unlocks and cancels by faking a reply from the
5014 dead node. Requests and up-conversions we flag to be resent after
5015 recovery. Down-conversions can just be completed with a fake reply like
5016 unlocks. Conversions between PR and CW need special attention. */
5018 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5020 struct dlm_lkb *lkb, *safe;
5021 struct dlm_message *ms_stub;
5022 int wait_type, stub_unlock_result, stub_cancel_result;
5025 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
5027 log_error(ls, "dlm_recover_waiters_pre no mem");
5031 mutex_lock(&ls->ls_waiters_mutex);
5033 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5035 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5037 /* exclude debug messages about unlocks because there can be so
5038 many and they aren't very interesting */
5040 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5041 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5042 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5046 lkb->lkb_resource->res_nodeid,
5048 lkb->lkb_wait_nodeid,
5052 /* all outstanding lookups, regardless of destination will be
5053 resent after recovery is done */
5055 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5056 lkb->lkb_flags |= DLM_IFL_RESEND;
5060 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5063 wait_type = lkb->lkb_wait_type;
5064 stub_unlock_result = -DLM_EUNLOCK;
5065 stub_cancel_result = -DLM_ECANCEL;
5067 /* Main reply may have been received leaving a zero wait_type,
5068 but a reply for the overlapping op may not have been
5069 received. In that case we need to fake the appropriate
5070 reply for the overlap op. */
5073 if (is_overlap_cancel(lkb)) {
5074 wait_type = DLM_MSG_CANCEL;
5075 if (lkb->lkb_grmode == DLM_LOCK_IV)
5076 stub_cancel_result = 0;
5078 if (is_overlap_unlock(lkb)) {
5079 wait_type = DLM_MSG_UNLOCK;
5080 if (lkb->lkb_grmode == DLM_LOCK_IV)
5081 stub_unlock_result = -ENOENT;
5084 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5085 lkb->lkb_id, lkb->lkb_flags, wait_type,
5086 stub_cancel_result, stub_unlock_result);
5089 switch (wait_type) {
5091 case DLM_MSG_REQUEST:
5092 lkb->lkb_flags |= DLM_IFL_RESEND;
5095 case DLM_MSG_CONVERT:
5096 recover_convert_waiter(ls, lkb, ms_stub);
5099 case DLM_MSG_UNLOCK:
5101 memset(ms_stub, 0, sizeof(struct dlm_message));
5102 ms_stub->m_flags = DLM_IFL_STUB_MS;
5103 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5104 ms_stub->m_result = stub_unlock_result;
5105 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5106 _receive_unlock_reply(lkb, ms_stub);
5110 case DLM_MSG_CANCEL:
5112 memset(ms_stub, 0, sizeof(struct dlm_message));
5113 ms_stub->m_flags = DLM_IFL_STUB_MS;
5114 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5115 ms_stub->m_result = stub_cancel_result;
5116 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5117 _receive_cancel_reply(lkb, ms_stub);
5122 log_error(ls, "invalid lkb wait_type %d %d",
5123 lkb->lkb_wait_type, wait_type);
5127 mutex_unlock(&ls->ls_waiters_mutex);
5131 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5133 struct dlm_lkb *lkb;
5136 mutex_lock(&ls->ls_waiters_mutex);
5137 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5138 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5144 mutex_unlock(&ls->ls_waiters_mutex);
5151 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
5152 master or dir-node for r. Processing the lkb may result in it being placed
5155 /* We do this after normal locking has been enabled and any saved messages
5156 (in requestqueue) have been processed. We should be confident that at
5157 this point we won't get or process a reply to any of these waiting
5158 operations. But, new ops may be coming in on the rsbs/locks here from
5159 userspace or remotely. */
5161 /* there may have been an overlap unlock/cancel prior to recovery or after
5162 recovery. if before, the lkb may still have a pos wait_count; if after, the
5163 overlap flag would just have been set and nothing new sent. we can be
5164 confident here than any replies to either the initial op or overlap ops
5165 prior to recovery have been received. */
5167 int dlm_recover_waiters_post(struct dlm_ls *ls)
5169 struct dlm_lkb *lkb;
5171 int error = 0, mstype, err, oc, ou;
5174 if (dlm_locking_stopped(ls)) {
5175 log_debug(ls, "recover_waiters_post aborted");
5180 lkb = find_resend_waiter(ls);
5184 r = lkb->lkb_resource;
5188 mstype = lkb->lkb_wait_type;
5189 oc = is_overlap_cancel(lkb);
5190 ou = is_overlap_unlock(lkb);
5193 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5194 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5195 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5196 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5197 dlm_dir_nodeid(r), oc, ou);
5199 /* At this point we assume that we won't get a reply to any
5200 previous op or overlap op on this lock. First, do a big
5201 remove_from_waiters() for all previous ops. */
5203 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5204 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5205 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5206 lkb->lkb_wait_type = 0;
5207 lkb->lkb_wait_count = 0;
5208 mutex_lock(&ls->ls_waiters_mutex);
5209 list_del_init(&lkb->lkb_wait_reply);
5210 mutex_unlock(&ls->ls_waiters_mutex);
5211 unhold_lkb(lkb); /* for waiters list */
5214 /* do an unlock or cancel instead of resending */
5216 case DLM_MSG_LOOKUP:
5217 case DLM_MSG_REQUEST:
5218 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5220 unhold_lkb(lkb); /* undoes create_lkb() */
5222 case DLM_MSG_CONVERT:
5224 queue_cast(r, lkb, -DLM_ECANCEL);
5226 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5227 _unlock_lock(r, lkb);
5235 case DLM_MSG_LOOKUP:
5236 case DLM_MSG_REQUEST:
5237 _request_lock(r, lkb);
5239 confirm_master(r, 0);
5241 case DLM_MSG_CONVERT:
5242 _convert_lock(r, lkb);
5250 log_error(ls, "waiter %x msg %d r_nodeid %d "
5251 "dir_nodeid %d overlap %d %d",
5252 lkb->lkb_id, mstype, r->res_nodeid,
5253 dlm_dir_nodeid(r), oc, ou);
5263 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5264 struct list_head *list)
5266 struct dlm_lkb *lkb, *safe;
5268 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5269 if (!is_master_copy(lkb))
5272 /* don't purge lkbs we've added in recover_master_copy for
5273 the current recovery seq */
5275 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5280 /* this put should free the lkb */
5281 if (!dlm_put_lkb(lkb))
5282 log_error(ls, "purged mstcpy lkb not released");
5286 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5288 struct dlm_ls *ls = r->res_ls;
5290 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5291 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5292 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5295 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5296 struct list_head *list,
5297 int nodeid_gone, unsigned int *count)
5299 struct dlm_lkb *lkb, *safe;
5301 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5302 if (!is_master_copy(lkb))
5305 if ((lkb->lkb_nodeid == nodeid_gone) ||
5306 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5310 /* this put should free the lkb */
5311 if (!dlm_put_lkb(lkb))
5312 log_error(ls, "purged dead lkb not released");
5314 rsb_set_flag(r, RSB_RECOVER_GRANT);
5321 /* Get rid of locks held by nodes that are gone. */
5323 void dlm_recover_purge(struct dlm_ls *ls)
5326 struct dlm_member *memb;
5327 int nodes_count = 0;
5328 int nodeid_gone = 0;
5329 unsigned int lkb_count = 0;
5331 /* cache one removed nodeid to optimize the common
5332 case of a single node removed */
5334 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5336 nodeid_gone = memb->nodeid;
5342 down_write(&ls->ls_root_sem);
5343 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5347 purge_dead_list(ls, r, &r->res_grantqueue,
5348 nodeid_gone, &lkb_count);
5349 purge_dead_list(ls, r, &r->res_convertqueue,
5350 nodeid_gone, &lkb_count);
5351 purge_dead_list(ls, r, &r->res_waitqueue,
5352 nodeid_gone, &lkb_count);
5358 up_write(&ls->ls_root_sem);
5361 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
5362 lkb_count, nodes_count);
5365 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5370 spin_lock(&ls->ls_rsbtbl[bucket].lock);
5371 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5372 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5374 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5376 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5380 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5383 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5388 * Attempt to grant locks on resources that we are the master of.
5389 * Locks may have become grantable during recovery because locks
5390 * from departed nodes have been purged (or not rebuilt), allowing
5391 * previously blocked locks to now be granted. The subset of rsb's
5392 * we are interested in are those with lkb's on either the convert or
5395 * Simplest would be to go through each master rsb and check for non-empty
5396 * convert or waiting queues, and attempt to grant on those rsbs.
5397 * Checking the queues requires lock_rsb, though, for which we'd need
5398 * to release the rsbtbl lock. This would make iterating through all
5399 * rsb's very inefficient. So, we rely on earlier recovery routines
5400 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5404 void dlm_recover_grant(struct dlm_ls *ls)
5408 unsigned int count = 0;
5409 unsigned int rsb_count = 0;
5410 unsigned int lkb_count = 0;
5413 r = find_grant_rsb(ls, bucket);
5415 if (bucket == ls->ls_rsbtbl_size - 1)
5423 grant_pending_locks(r, &count);
5425 confirm_master(r, 0);
5432 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
5433 lkb_count, rsb_count);
5436 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5439 struct dlm_lkb *lkb;
5441 list_for_each_entry(lkb, head, lkb_statequeue) {
5442 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5448 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5451 struct dlm_lkb *lkb;
5453 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5456 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5459 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5465 /* needs at least dlm_rcom + rcom_lock */
5466 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5467 struct dlm_rsb *r, struct dlm_rcom *rc)
5469 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5471 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5472 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5473 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5474 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5475 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5476 lkb->lkb_flags |= DLM_IFL_MSTCPY;
5477 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5478 lkb->lkb_rqmode = rl->rl_rqmode;
5479 lkb->lkb_grmode = rl->rl_grmode;
5480 /* don't set lkb_status because add_lkb wants to itself */
5482 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5483 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5485 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5486 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5487 sizeof(struct rcom_lock);
5488 if (lvblen > ls->ls_lvblen)
5490 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5491 if (!lkb->lkb_lvbptr)
5493 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5496 /* Conversions between PR and CW (middle modes) need special handling.
5497 The real granted mode of these converting locks cannot be determined
5498 until all locks have been rebuilt on the rsb (recover_conversion) */
5500 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5501 middle_conversion(lkb)) {
5502 rl->rl_status = DLM_LKSTS_CONVERT;
5503 lkb->lkb_grmode = DLM_LOCK_IV;
5504 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5510 /* This lkb may have been recovered in a previous aborted recovery so we need
5511 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5512 If so we just send back a standard reply. If not, we create a new lkb with
5513 the given values and send back our lkid. We send back our lkid by sending
5514 back the rcom_lock struct we got but with the remid field filled in. */
5516 /* needs at least dlm_rcom + rcom_lock */
5517 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5519 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5521 struct dlm_lkb *lkb;
5523 int from_nodeid = rc->rc_header.h_nodeid;
5526 if (rl->rl_parent_lkid) {
5527 error = -EOPNOTSUPP;
5531 remid = le32_to_cpu(rl->rl_lkid);
5533 /* In general we expect the rsb returned to be R_MASTER, but we don't
5534 have to require it. Recovery of masters on one node can overlap
5535 recovery of locks on another node, so one node can send us MSTCPY
5536 locks before we've made ourselves master of this rsb. We can still
5537 add new MSTCPY locks that we receive here without any harm; when
5538 we make ourselves master, dlm_recover_masters() won't touch the
5539 MSTCPY locks we've received early. */
5541 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5542 from_nodeid, R_RECEIVE_RECOVER, &r);
5548 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5549 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5550 from_nodeid, remid);
5555 lkb = search_remid(r, from_nodeid, remid);
5561 error = create_lkb(ls, &lkb);
5565 error = receive_rcom_lock_args(ls, lkb, r, rc);
5572 add_lkb(r, lkb, rl->rl_status);
5574 ls->ls_recover_locks_in++;
5576 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5577 rsb_set_flag(r, RSB_RECOVER_GRANT);
5580 /* this is the new value returned to the lock holder for
5581 saving in its process-copy lkb */
5582 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5584 lkb->lkb_recover_seq = ls->ls_recover_seq;
5590 if (error && error != -EEXIST)
5591 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
5592 from_nodeid, remid, error);
5593 rl->rl_result = cpu_to_le32(error);
5597 /* needs at least dlm_rcom + rcom_lock */
5598 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5600 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5602 struct dlm_lkb *lkb;
5603 uint32_t lkid, remid;
5606 lkid = le32_to_cpu(rl->rl_lkid);
5607 remid = le32_to_cpu(rl->rl_remid);
5608 result = le32_to_cpu(rl->rl_result);
5610 error = find_lkb(ls, lkid, &lkb);
5612 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5613 lkid, rc->rc_header.h_nodeid, remid, result);
5617 r = lkb->lkb_resource;
5621 if (!is_process_copy(lkb)) {
5622 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5623 lkid, rc->rc_header.h_nodeid, remid, result);
5633 /* There's a chance the new master received our lock before
5634 dlm_recover_master_reply(), this wouldn't happen if we did
5635 a barrier between recover_masters and recover_locks. */
5637 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5638 lkid, rc->rc_header.h_nodeid, remid, result);
5640 dlm_send_rcom_lock(r, lkb);
5644 lkb->lkb_remid = remid;
5647 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5648 lkid, rc->rc_header.h_nodeid, remid, result);
5651 /* an ack for dlm_recover_locks() which waits for replies from
5652 all the locks it sends to new masters */
5653 dlm_recovered_lock(r);
5662 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5663 int mode, uint32_t flags, void *name, unsigned int namelen,
5664 unsigned long timeout_cs)
5666 struct dlm_lkb *lkb;
5667 struct dlm_args args;
5670 dlm_lock_recovery(ls);
5672 error = create_lkb(ls, &lkb);
5678 if (flags & DLM_LKF_VALBLK) {
5679 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5680 if (!ua->lksb.sb_lvbptr) {
5688 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5689 When DLM_IFL_USER is set, the dlm knows that this is a userspace
5690 lock and that lkb_astparam is the dlm_user_args structure. */
5692 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5693 fake_astfn, ua, fake_bastfn, &args);
5694 lkb->lkb_flags |= DLM_IFL_USER;
5701 error = request_lock(ls, lkb, name, namelen, &args);
5717 /* add this new lkb to the per-process list of locks */
5718 spin_lock(&ua->proc->locks_spin);
5720 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5721 spin_unlock(&ua->proc->locks_spin);
5723 dlm_unlock_recovery(ls);
5727 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5728 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5729 unsigned long timeout_cs)
5731 struct dlm_lkb *lkb;
5732 struct dlm_args args;
5733 struct dlm_user_args *ua;
5736 dlm_lock_recovery(ls);
5738 error = find_lkb(ls, lkid, &lkb);
5742 /* user can change the params on its lock when it converts it, or
5743 add an lvb that didn't exist before */
5747 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5748 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5749 if (!ua->lksb.sb_lvbptr) {
5754 if (lvb_in && ua->lksb.sb_lvbptr)
5755 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5757 ua->xid = ua_tmp->xid;
5758 ua->castparam = ua_tmp->castparam;
5759 ua->castaddr = ua_tmp->castaddr;
5760 ua->bastparam = ua_tmp->bastparam;
5761 ua->bastaddr = ua_tmp->bastaddr;
5762 ua->user_lksb = ua_tmp->user_lksb;
5764 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5765 fake_astfn, ua, fake_bastfn, &args);
5769 error = convert_lock(ls, lkb, &args);
5771 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5776 dlm_unlock_recovery(ls);
5781 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5782 uint32_t flags, uint32_t lkid, char *lvb_in)
5784 struct dlm_lkb *lkb;
5785 struct dlm_args args;
5786 struct dlm_user_args *ua;
5789 dlm_lock_recovery(ls);
5791 error = find_lkb(ls, lkid, &lkb);
5797 if (lvb_in && ua->lksb.sb_lvbptr)
5798 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5799 if (ua_tmp->castparam)
5800 ua->castparam = ua_tmp->castparam;
5801 ua->user_lksb = ua_tmp->user_lksb;
5803 error = set_unlock_args(flags, ua, &args);
5807 error = unlock_lock(ls, lkb, &args);
5809 if (error == -DLM_EUNLOCK)
5811 /* from validate_unlock_args() */
5812 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5817 spin_lock(&ua->proc->locks_spin);
5818 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5819 if (!list_empty(&lkb->lkb_ownqueue))
5820 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5821 spin_unlock(&ua->proc->locks_spin);
5825 dlm_unlock_recovery(ls);
5830 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5831 uint32_t flags, uint32_t lkid)
5833 struct dlm_lkb *lkb;
5834 struct dlm_args args;
5835 struct dlm_user_args *ua;
5838 dlm_lock_recovery(ls);
5840 error = find_lkb(ls, lkid, &lkb);
5845 if (ua_tmp->castparam)
5846 ua->castparam = ua_tmp->castparam;
5847 ua->user_lksb = ua_tmp->user_lksb;
5849 error = set_unlock_args(flags, ua, &args);
5853 error = cancel_lock(ls, lkb, &args);
5855 if (error == -DLM_ECANCEL)
5857 /* from validate_unlock_args() */
5858 if (error == -EBUSY)
5863 dlm_unlock_recovery(ls);
5868 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5870 struct dlm_lkb *lkb;
5871 struct dlm_args args;
5872 struct dlm_user_args *ua;
5876 dlm_lock_recovery(ls);
5878 error = find_lkb(ls, lkid, &lkb);
5884 error = set_unlock_args(flags, ua, &args);
5888 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5890 r = lkb->lkb_resource;
5894 error = validate_unlock_args(lkb, &args);
5897 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5899 error = _cancel_lock(r, lkb);
5904 if (error == -DLM_ECANCEL)
5906 /* from validate_unlock_args() */
5907 if (error == -EBUSY)
5912 dlm_unlock_recovery(ls);
5916 /* lkb's that are removed from the waiters list by revert are just left on the
5917 orphans list with the granted orphan locks, to be freed by purge */
5919 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5921 struct dlm_args args;
5925 mutex_lock(&ls->ls_orphans_mutex);
5926 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5927 mutex_unlock(&ls->ls_orphans_mutex);
5929 set_unlock_args(0, lkb->lkb_ua, &args);
5931 error = cancel_lock(ls, lkb, &args);
5932 if (error == -DLM_ECANCEL)
5937 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5938 Regardless of what rsb queue the lock is on, it's removed and freed. */
5940 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5942 struct dlm_args args;
5945 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5947 error = unlock_lock(ls, lkb, &args);
5948 if (error == -DLM_EUNLOCK)
5953 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5954 (which does lock_rsb) due to deadlock with receiving a message that does
5955 lock_rsb followed by dlm_user_add_cb() */
5957 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5958 struct dlm_user_proc *proc)
5960 struct dlm_lkb *lkb = NULL;
5962 mutex_lock(&ls->ls_clear_proc_locks);
5963 if (list_empty(&proc->locks))
5966 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5967 list_del_init(&lkb->lkb_ownqueue);
5969 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5970 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5972 lkb->lkb_flags |= DLM_IFL_DEAD;
5974 mutex_unlock(&ls->ls_clear_proc_locks);
5978 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5979 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5980 which we clear here. */
5982 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5983 list, and no more device_writes should add lkb's to proc->locks list; so we
5984 shouldn't need to take asts_spin or locks_spin here. this assumes that
5985 device reads/writes/closes are serialized -- FIXME: we may need to serialize
5988 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5990 struct dlm_lkb *lkb, *safe;
5992 dlm_lock_recovery(ls);
5995 lkb = del_proc_lock(ls, proc);
5999 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6000 orphan_proc_lock(ls, lkb);
6002 unlock_proc_lock(ls, lkb);
6004 /* this removes the reference for the proc->locks list
6005 added by dlm_user_request, it may result in the lkb
6011 mutex_lock(&ls->ls_clear_proc_locks);
6013 /* in-progress unlocks */
6014 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6015 list_del_init(&lkb->lkb_ownqueue);
6016 lkb->lkb_flags |= DLM_IFL_DEAD;
6020 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6021 memset(&lkb->lkb_callbacks, 0,
6022 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6023 list_del_init(&lkb->lkb_cb_list);
6027 mutex_unlock(&ls->ls_clear_proc_locks);
6028 dlm_unlock_recovery(ls);
6031 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6033 struct dlm_lkb *lkb, *safe;
6037 spin_lock(&proc->locks_spin);
6038 if (!list_empty(&proc->locks)) {
6039 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6041 list_del_init(&lkb->lkb_ownqueue);
6043 spin_unlock(&proc->locks_spin);
6048 lkb->lkb_flags |= DLM_IFL_DEAD;
6049 unlock_proc_lock(ls, lkb);
6050 dlm_put_lkb(lkb); /* ref from proc->locks list */
6053 spin_lock(&proc->locks_spin);
6054 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6055 list_del_init(&lkb->lkb_ownqueue);
6056 lkb->lkb_flags |= DLM_IFL_DEAD;
6059 spin_unlock(&proc->locks_spin);
6061 spin_lock(&proc->asts_spin);
6062 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6063 memset(&lkb->lkb_callbacks, 0,
6064 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6065 list_del_init(&lkb->lkb_cb_list);
6068 spin_unlock(&proc->asts_spin);
6071 /* pid of 0 means purge all orphans */
6073 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6075 struct dlm_lkb *lkb, *safe;
6077 mutex_lock(&ls->ls_orphans_mutex);
6078 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6079 if (pid && lkb->lkb_ownpid != pid)
6081 unlock_proc_lock(ls, lkb);
6082 list_del_init(&lkb->lkb_ownqueue);
6085 mutex_unlock(&ls->ls_orphans_mutex);
6088 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6090 struct dlm_message *ms;
6091 struct dlm_mhandle *mh;
6094 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6095 DLM_MSG_PURGE, &ms, &mh);
6098 ms->m_nodeid = nodeid;
6101 return send_message(mh, ms);
6104 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6105 int nodeid, int pid)
6109 if (nodeid != dlm_our_nodeid()) {
6110 error = send_purge(ls, nodeid, pid);
6112 dlm_lock_recovery(ls);
6113 if (pid == current->pid)
6114 purge_proc_locks(ls, proc);
6116 do_purge(ls, nodeid, pid);
6117 dlm_unlock_recovery(ls);