1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
65 #include "requestqueue.h"
69 #include "lockspace.h"
74 #include "lvb_table.h"
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.
99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
102 static const int __dlm_compat_matrix[8][8] = {
103 /* UN NL CR CW PR PW EX PD */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
115 * This defines the direction of transfer of LVB data.
116 * Granted mode is the row; requested mode is the column.
117 * Usage: matrix[grmode+1][rqmode+1]
118 * 1 = LVB is returned to the caller
119 * 0 = LVB is written to the resource
120 * -1 = nothing happens to the LVB
123 const int dlm_lvb_operations[8][8] = {
124 /* UN NL CR CW PR PW EX PD*/
125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
135 #define modes_compat(gr, rq) \
136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138 int dlm_modes_compat(int mode1, int mode2)
140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
144 * Compatibility matrix for conversions with QUECVT set.
145 * Granted mode is the row; requested mode is the column.
146 * Usage: matrix[grmode+1][rqmode+1]
149 static const int __quecvt_compat_matrix[8][8] = {
150 /* UN NL CR CW PR PW EX PD */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
161 void dlm_print_lkb(struct dlm_lkb *lkb)
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
171 static void dlm_print_rsb(struct dlm_rsb *r)
173 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
174 r->res_nodeid, r->res_flags, r->res_first_lkid,
175 r->res_recover_locks_count, r->res_name);
178 void dlm_dump_rsb(struct dlm_rsb *r)
184 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
185 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
186 printk(KERN_ERR "rsb lookup list\n");
187 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
189 printk(KERN_ERR "rsb grant queue:\n");
190 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
192 printk(KERN_ERR "rsb convert queue:\n");
193 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
195 printk(KERN_ERR "rsb wait queue:\n");
196 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200 /* Threads cannot use the lockspace while it's being recovered */
202 static inline void dlm_lock_recovery(struct dlm_ls *ls)
204 down_read(&ls->ls_in_recovery);
207 void dlm_unlock_recovery(struct dlm_ls *ls)
209 up_read(&ls->ls_in_recovery);
212 int dlm_lock_recovery_try(struct dlm_ls *ls)
214 return down_read_trylock(&ls->ls_in_recovery);
217 static inline int can_be_queued(struct dlm_lkb *lkb)
219 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 static inline int force_blocking_asts(struct dlm_lkb *lkb)
224 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 static inline int is_demoted(struct dlm_lkb *lkb)
229 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
232 static inline int is_altmode(struct dlm_lkb *lkb)
234 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
237 static inline int is_granted(struct dlm_lkb *lkb)
239 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 static inline int is_remote(struct dlm_rsb *r)
244 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
245 return !!r->res_nodeid;
248 static inline int is_process_copy(struct dlm_lkb *lkb)
250 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
253 static inline int is_master_copy(struct dlm_lkb *lkb)
255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
258 static inline int middle_conversion(struct dlm_lkb *lkb)
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
266 static inline int down_conversion(struct dlm_lkb *lkb)
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
281 static inline int is_overlap(struct dlm_lkb *lkb)
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 if (is_master_copy(lkb))
294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 if (is_master_copy(lkb)) {
320 send_bast(r, lkb, rqmode);
322 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
327 * Basic operations on rsb's and lkb's
330 static int pre_rsb_struct(struct dlm_ls *ls)
332 struct dlm_rsb *r1, *r2;
335 spin_lock(&ls->ls_new_rsb_spin);
336 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
337 spin_unlock(&ls->ls_new_rsb_spin);
340 spin_unlock(&ls->ls_new_rsb_spin);
342 r1 = dlm_allocate_rsb(ls);
343 r2 = dlm_allocate_rsb(ls);
345 spin_lock(&ls->ls_new_rsb_spin);
347 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
348 ls->ls_new_rsb_count++;
351 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
352 ls->ls_new_rsb_count++;
354 count = ls->ls_new_rsb_count;
355 spin_unlock(&ls->ls_new_rsb_spin);
362 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
363 unlock any spinlocks, go back and call pre_rsb_struct again.
364 Otherwise, take an rsb off the list and return it. */
366 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
367 struct dlm_rsb **r_ret)
372 spin_lock(&ls->ls_new_rsb_spin);
373 if (list_empty(&ls->ls_new_rsb)) {
374 count = ls->ls_new_rsb_count;
375 spin_unlock(&ls->ls_new_rsb_spin);
376 log_debug(ls, "find_rsb retry %d %d %s",
377 count, dlm_config.ci_new_rsb_count, name);
381 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382 list_del(&r->res_hashchain);
383 /* Convert the empty list_head to a NULL rb_node for tree usage: */
384 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
385 ls->ls_new_rsb_count--;
386 spin_unlock(&ls->ls_new_rsb_spin);
390 memcpy(r->res_name, name, len);
391 mutex_init(&r->res_mutex);
393 INIT_LIST_HEAD(&r->res_lookup);
394 INIT_LIST_HEAD(&r->res_grantqueue);
395 INIT_LIST_HEAD(&r->res_convertqueue);
396 INIT_LIST_HEAD(&r->res_waitqueue);
397 INIT_LIST_HEAD(&r->res_root_list);
398 INIT_LIST_HEAD(&r->res_recover_list);
404 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406 char maxname[DLM_RESNAME_MAXLEN];
408 memset(maxname, 0, DLM_RESNAME_MAXLEN);
409 memcpy(maxname, name, nlen);
410 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
413 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
414 unsigned int flags, struct dlm_rsb **r_ret)
416 struct rb_node *node = tree->rb_node;
422 r = rb_entry(node, struct dlm_rsb, res_hashnode);
423 rc = rsb_cmp(r, name, len);
425 node = node->rb_left;
427 node = node->rb_right;
435 if (r->res_nodeid && (flags & R_MASTER))
441 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443 struct rb_node **newn = &tree->rb_node;
444 struct rb_node *parent = NULL;
448 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
452 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454 newn = &parent->rb_left;
456 newn = &parent->rb_right;
458 log_print("rsb_insert match");
465 rb_link_node(&rsb->res_hashnode, parent, newn);
466 rb_insert_color(&rsb->res_hashnode, tree);
470 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
471 unsigned int flags, struct dlm_rsb **r_ret)
476 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
478 kref_get(&r->res_ref);
481 if (error == -ENOTBLK)
484 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
488 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
489 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
493 if (dlm_no_directory(ls))
496 if (r->res_nodeid == -1) {
497 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
498 r->res_first_lkid = 0;
499 } else if (r->res_nodeid > 0) {
500 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
501 r->res_first_lkid = 0;
503 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
504 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
512 * Find rsb in rsbtbl and potentially create/add one
514 * Delaying the release of rsb's has a similar benefit to applications keeping
515 * NL locks on an rsb, but without the guarantee that the cached master value
516 * will still be valid when the rsb is reused. Apps aren't always smart enough
517 * to keep NL locks on an rsb that they may lock again shortly; this can lead
518 * to excessive master lookups and removals if we don't delay the release.
520 * Searching for an rsb means looking through both the normal list and toss
521 * list. When found on the toss list the rsb is moved to the normal list with
522 * ref count of 1; when found on normal list the ref count is incremented.
525 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
526 unsigned int flags, struct dlm_rsb **r_ret)
528 struct dlm_rsb *r = NULL;
529 uint32_t hash, bucket;
532 if (namelen > DLM_RESNAME_MAXLEN) {
537 if (dlm_no_directory(ls))
540 hash = jhash(name, namelen, 0);
541 bucket = hash & (ls->ls_rsbtbl_size - 1);
544 if (flags & R_CREATE) {
545 error = pre_rsb_struct(ls);
550 spin_lock(&ls->ls_rsbtbl[bucket].lock);
552 error = _search_rsb(ls, name, namelen, bucket, flags, &r);
556 if (error == -EBADR && !(flags & R_CREATE))
559 /* the rsb was found but wasn't a master copy */
560 if (error == -ENOTBLK)
563 error = get_rsb_struct(ls, name, namelen, &r);
564 if (error == -EAGAIN) {
565 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
572 r->res_bucket = bucket;
574 kref_init(&r->res_ref);
576 /* With no directory, the master can be set immediately */
577 if (dlm_no_directory(ls)) {
578 int nodeid = dlm_dir_nodeid(r);
579 if (nodeid == dlm_our_nodeid())
581 r->res_nodeid = nodeid;
583 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
585 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
591 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
597 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
598 spin_lock(&ls->ls_rsbtbl[i].lock);
599 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
600 r = rb_entry(n, struct dlm_rsb, res_hashnode);
601 if (r->res_hash == hash)
604 spin_unlock(&ls->ls_rsbtbl[i].lock);
608 /* This is only called to add a reference when the code already holds
609 a valid reference to the rsb, so there's no need for locking. */
611 static inline void hold_rsb(struct dlm_rsb *r)
613 kref_get(&r->res_ref);
616 void dlm_hold_rsb(struct dlm_rsb *r)
621 static void toss_rsb(struct kref *kref)
623 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
624 struct dlm_ls *ls = r->res_ls;
626 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
627 kref_init(&r->res_ref);
628 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
629 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
630 r->res_toss_time = jiffies;
632 dlm_free_lvb(r->res_lvbptr);
633 r->res_lvbptr = NULL;
637 /* When all references to the rsb are gone it's transferred to
638 the tossed list for later disposal. */
640 static void put_rsb(struct dlm_rsb *r)
642 struct dlm_ls *ls = r->res_ls;
643 uint32_t bucket = r->res_bucket;
645 spin_lock(&ls->ls_rsbtbl[bucket].lock);
646 kref_put(&r->res_ref, toss_rsb);
647 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
650 void dlm_put_rsb(struct dlm_rsb *r)
655 /* See comment for unhold_lkb */
657 static void unhold_rsb(struct dlm_rsb *r)
660 rv = kref_put(&r->res_ref, toss_rsb);
661 DLM_ASSERT(!rv, dlm_dump_rsb(r););
664 static void kill_rsb(struct kref *kref)
666 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
668 /* All work is done after the return from kref_put() so we
669 can release the write_lock before the remove and free. */
671 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
672 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
673 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
674 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
675 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
676 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
679 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
680 The rsb must exist as long as any lkb's for it do. */
682 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
685 lkb->lkb_resource = r;
688 static void detach_lkb(struct dlm_lkb *lkb)
690 if (lkb->lkb_resource) {
691 put_rsb(lkb->lkb_resource);
692 lkb->lkb_resource = NULL;
696 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
701 lkb = dlm_allocate_lkb(ls);
705 lkb->lkb_nodeid = -1;
706 lkb->lkb_grmode = DLM_LOCK_IV;
707 kref_init(&lkb->lkb_ref);
708 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
709 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
710 INIT_LIST_HEAD(&lkb->lkb_time_list);
711 INIT_LIST_HEAD(&lkb->lkb_cb_list);
712 mutex_init(&lkb->lkb_cb_mutex);
713 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
716 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
720 spin_lock(&ls->ls_lkbidr_spin);
721 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
724 spin_unlock(&ls->ls_lkbidr_spin);
730 log_error(ls, "create_lkb idr error %d", rv);
738 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
742 spin_lock(&ls->ls_lkbidr_spin);
743 lkb = idr_find(&ls->ls_lkbidr, lkid);
745 kref_get(&lkb->lkb_ref);
746 spin_unlock(&ls->ls_lkbidr_spin);
749 return lkb ? 0 : -ENOENT;
752 static void kill_lkb(struct kref *kref)
754 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
756 /* All work is done after the return from kref_put() so we
757 can release the write_lock before the detach_lkb */
759 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
762 /* __put_lkb() is used when an lkb may not have an rsb attached to
763 it so we need to provide the lockspace explicitly */
765 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
767 uint32_t lkid = lkb->lkb_id;
769 spin_lock(&ls->ls_lkbidr_spin);
770 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
771 idr_remove(&ls->ls_lkbidr, lkid);
772 spin_unlock(&ls->ls_lkbidr_spin);
776 /* for local/process lkbs, lvbptr points to caller's lksb */
777 if (lkb->lkb_lvbptr && is_master_copy(lkb))
778 dlm_free_lvb(lkb->lkb_lvbptr);
782 spin_unlock(&ls->ls_lkbidr_spin);
787 int dlm_put_lkb(struct dlm_lkb *lkb)
791 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
792 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
794 ls = lkb->lkb_resource->res_ls;
795 return __put_lkb(ls, lkb);
798 /* This is only called to add a reference when the code already holds
799 a valid reference to the lkb, so there's no need for locking. */
801 static inline void hold_lkb(struct dlm_lkb *lkb)
803 kref_get(&lkb->lkb_ref);
806 /* This is called when we need to remove a reference and are certain
807 it's not the last ref. e.g. del_lkb is always called between a
808 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
809 put_lkb would work fine, but would involve unnecessary locking */
811 static inline void unhold_lkb(struct dlm_lkb *lkb)
814 rv = kref_put(&lkb->lkb_ref, kill_lkb);
815 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
818 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
821 struct dlm_lkb *lkb = NULL;
823 list_for_each_entry(lkb, head, lkb_statequeue)
824 if (lkb->lkb_rqmode < mode)
827 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
830 /* add/remove lkb to rsb's grant/convert/wait queue */
832 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
834 kref_get(&lkb->lkb_ref);
836 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
838 lkb->lkb_timestamp = ktime_get();
840 lkb->lkb_status = status;
843 case DLM_LKSTS_WAITING:
844 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
845 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
847 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
849 case DLM_LKSTS_GRANTED:
850 /* convention says granted locks kept in order of grmode */
851 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
854 case DLM_LKSTS_CONVERT:
855 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
856 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
858 list_add_tail(&lkb->lkb_statequeue,
859 &r->res_convertqueue);
862 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
866 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
869 list_del(&lkb->lkb_statequeue);
873 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
877 add_lkb(r, lkb, sts);
881 static int msg_reply_type(int mstype)
884 case DLM_MSG_REQUEST:
885 return DLM_MSG_REQUEST_REPLY;
886 case DLM_MSG_CONVERT:
887 return DLM_MSG_CONVERT_REPLY;
889 return DLM_MSG_UNLOCK_REPLY;
891 return DLM_MSG_CANCEL_REPLY;
893 return DLM_MSG_LOOKUP_REPLY;
898 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
902 for (i = 0; i < num_nodes; i++) {
907 if (warned[i] == nodeid)
913 void dlm_scan_waiters(struct dlm_ls *ls)
916 ktime_t zero = ktime_set(0, 0);
919 u32 debug_scanned = 0;
920 u32 debug_expired = 0;
924 if (!dlm_config.ci_waitwarn_us)
927 mutex_lock(&ls->ls_waiters_mutex);
929 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
930 if (ktime_equal(lkb->lkb_wait_time, zero))
935 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
937 if (us < dlm_config.ci_waitwarn_us)
940 lkb->lkb_wait_time = zero;
943 if (us > debug_maxus)
947 num_nodes = ls->ls_num_nodes;
948 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
952 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
955 log_error(ls, "waitwarn %x %lld %d us check connection to "
956 "node %d", lkb->lkb_id, (long long)us,
957 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
959 mutex_unlock(&ls->ls_waiters_mutex);
963 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
964 debug_scanned, debug_expired,
965 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
968 /* add/remove lkb from global waiters list of lkb's waiting for
969 a reply from a remote node */
971 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
973 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
976 mutex_lock(&ls->ls_waiters_mutex);
978 if (is_overlap_unlock(lkb) ||
979 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
984 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
987 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
990 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
996 lkb->lkb_wait_count++;
999 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1000 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1001 lkb->lkb_wait_count, lkb->lkb_flags);
1005 DLM_ASSERT(!lkb->lkb_wait_count,
1007 printk("wait_count %d\n", lkb->lkb_wait_count););
1009 lkb->lkb_wait_count++;
1010 lkb->lkb_wait_type = mstype;
1011 lkb->lkb_wait_time = ktime_get();
1012 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1014 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1017 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1018 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1019 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1020 mutex_unlock(&ls->ls_waiters_mutex);
1024 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1025 list as part of process_requestqueue (e.g. a lookup that has an optimized
1026 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1027 set RESEND and dlm_recover_waiters_post() */
1029 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1030 struct dlm_message *ms)
1032 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1033 int overlap_done = 0;
1035 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1036 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1037 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1042 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1043 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1044 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1049 /* Cancel state was preemptively cleared by a successful convert,
1050 see next comment, nothing to do. */
1052 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1053 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1054 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1055 lkb->lkb_id, lkb->lkb_wait_type);
1059 /* Remove for the convert reply, and premptively remove for the
1060 cancel reply. A convert has been granted while there's still
1061 an outstanding cancel on it (the cancel is moot and the result
1062 in the cancel reply should be 0). We preempt the cancel reply
1063 because the app gets the convert result and then can follow up
1064 with another op, like convert. This subsequent op would see the
1065 lingering state of the cancel and fail with -EBUSY. */
1067 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1068 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1069 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1070 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1072 lkb->lkb_wait_type = 0;
1073 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1074 lkb->lkb_wait_count--;
1078 /* N.B. type of reply may not always correspond to type of original
1079 msg due to lookup->request optimization, verify others? */
1081 if (lkb->lkb_wait_type) {
1082 lkb->lkb_wait_type = 0;
1086 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1087 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1088 mstype, lkb->lkb_flags);
1092 /* the force-unlock/cancel has completed and we haven't recvd a reply
1093 to the op that was in progress prior to the unlock/cancel; we
1094 give up on any reply to the earlier op. FIXME: not sure when/how
1095 this would happen */
1097 if (overlap_done && lkb->lkb_wait_type) {
1098 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1099 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1100 lkb->lkb_wait_count--;
1101 lkb->lkb_wait_type = 0;
1104 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1106 lkb->lkb_flags &= ~DLM_IFL_RESEND;
1107 lkb->lkb_wait_count--;
1108 if (!lkb->lkb_wait_count)
1109 list_del_init(&lkb->lkb_wait_reply);
1114 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1116 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1119 mutex_lock(&ls->ls_waiters_mutex);
1120 error = _remove_from_waiters(lkb, mstype, NULL);
1121 mutex_unlock(&ls->ls_waiters_mutex);
1125 /* Handles situations where we might be processing a "fake" or "stub" reply in
1126 which we can't try to take waiters_mutex again. */
1128 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1130 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1133 if (ms->m_flags != DLM_IFL_STUB_MS)
1134 mutex_lock(&ls->ls_waiters_mutex);
1135 error = _remove_from_waiters(lkb, ms->m_type, ms);
1136 if (ms->m_flags != DLM_IFL_STUB_MS)
1137 mutex_unlock(&ls->ls_waiters_mutex);
1141 static void dir_remove(struct dlm_rsb *r)
1145 if (dlm_no_directory(r->res_ls))
1148 to_nodeid = dlm_dir_nodeid(r);
1149 if (to_nodeid != dlm_our_nodeid())
1152 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1153 r->res_name, r->res_length);
1156 /* FIXME: make this more efficient */
1158 static int shrink_bucket(struct dlm_ls *ls, int b)
1162 int count = 0, found;
1166 spin_lock(&ls->ls_rsbtbl[b].lock);
1167 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1168 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1169 if (!time_after_eq(jiffies, r->res_toss_time +
1170 dlm_config.ci_toss_secs * HZ))
1177 spin_unlock(&ls->ls_rsbtbl[b].lock);
1181 if (kref_put(&r->res_ref, kill_rsb)) {
1182 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1183 spin_unlock(&ls->ls_rsbtbl[b].lock);
1190 spin_unlock(&ls->ls_rsbtbl[b].lock);
1191 log_error(ls, "tossed rsb in use %s", r->res_name);
1198 void dlm_scan_rsbs(struct dlm_ls *ls)
1202 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1203 shrink_bucket(ls, i);
1204 if (dlm_locking_stopped(ls))
1210 static void add_timeout(struct dlm_lkb *lkb)
1212 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1214 if (is_master_copy(lkb))
1217 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1218 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1219 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1222 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1227 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1228 mutex_lock(&ls->ls_timeout_mutex);
1230 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1231 mutex_unlock(&ls->ls_timeout_mutex);
1234 static void del_timeout(struct dlm_lkb *lkb)
1236 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1238 mutex_lock(&ls->ls_timeout_mutex);
1239 if (!list_empty(&lkb->lkb_time_list)) {
1240 list_del_init(&lkb->lkb_time_list);
1243 mutex_unlock(&ls->ls_timeout_mutex);
1246 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1247 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1248 and then lock rsb because of lock ordering in add_timeout. We may need
1249 to specify some special timeout-related bits in the lkb that are just to
1250 be accessed under the timeout_mutex. */
1252 void dlm_scan_timeout(struct dlm_ls *ls)
1255 struct dlm_lkb *lkb;
1256 int do_cancel, do_warn;
1260 if (dlm_locking_stopped(ls))
1265 mutex_lock(&ls->ls_timeout_mutex);
1266 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1268 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1269 lkb->lkb_timestamp));
1271 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1272 wait_us >= (lkb->lkb_timeout_cs * 10000))
1275 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1276 wait_us >= dlm_config.ci_timewarn_cs * 10000)
1279 if (!do_cancel && !do_warn)
1284 mutex_unlock(&ls->ls_timeout_mutex);
1286 if (!do_cancel && !do_warn)
1289 r = lkb->lkb_resource;
1294 /* clear flag so we only warn once */
1295 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1296 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1298 dlm_timeout_warn(lkb);
1302 log_debug(ls, "timeout cancel %x node %d %s",
1303 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1304 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1305 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1307 _cancel_lock(r, lkb);
1316 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1317 dlm_recoverd before checking/setting ls_recover_begin. */
1319 void dlm_adjust_timeouts(struct dlm_ls *ls)
1321 struct dlm_lkb *lkb;
1322 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1324 ls->ls_recover_begin = 0;
1325 mutex_lock(&ls->ls_timeout_mutex);
1326 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1327 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1328 mutex_unlock(&ls->ls_timeout_mutex);
1330 if (!dlm_config.ci_waitwarn_us)
1333 mutex_lock(&ls->ls_waiters_mutex);
1334 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1335 if (ktime_to_us(lkb->lkb_wait_time))
1336 lkb->lkb_wait_time = ktime_get();
1338 mutex_unlock(&ls->ls_waiters_mutex);
1341 /* lkb is master or local copy */
1343 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1345 int b, len = r->res_ls->ls_lvblen;
1347 /* b=1 lvb returned to caller
1348 b=0 lvb written to rsb or invalidated
1351 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1354 if (!lkb->lkb_lvbptr)
1357 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1363 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1364 lkb->lkb_lvbseq = r->res_lvbseq;
1366 } else if (b == 0) {
1367 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1368 rsb_set_flag(r, RSB_VALNOTVALID);
1372 if (!lkb->lkb_lvbptr)
1375 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1379 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1384 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1386 lkb->lkb_lvbseq = r->res_lvbseq;
1387 rsb_clear_flag(r, RSB_VALNOTVALID);
1390 if (rsb_flag(r, RSB_VALNOTVALID))
1391 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1394 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1396 if (lkb->lkb_grmode < DLM_LOCK_PW)
1399 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1400 rsb_set_flag(r, RSB_VALNOTVALID);
1404 if (!lkb->lkb_lvbptr)
1407 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1411 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1416 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1418 rsb_clear_flag(r, RSB_VALNOTVALID);
1421 /* lkb is process copy (pc) */
1423 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1424 struct dlm_message *ms)
1428 if (!lkb->lkb_lvbptr)
1431 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1434 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1436 int len = receive_extralen(ms);
1437 if (len > DLM_RESNAME_MAXLEN)
1438 len = DLM_RESNAME_MAXLEN;
1439 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1440 lkb->lkb_lvbseq = ms->m_lvbseq;
1444 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1445 remove_lock -- used for unlock, removes lkb from granted
1446 revert_lock -- used for cancel, moves lkb from convert to granted
1447 grant_lock -- used for request and convert, adds lkb to granted or
1448 moves lkb from convert or waiting to granted
1450 Each of these is used for master or local copy lkb's. There is
1451 also a _pc() variation used to make the corresponding change on
1452 a process copy (pc) lkb. */
1454 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1457 lkb->lkb_grmode = DLM_LOCK_IV;
1458 /* this unhold undoes the original ref from create_lkb()
1459 so this leads to the lkb being freed */
1463 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1465 set_lvb_unlock(r, lkb);
1466 _remove_lock(r, lkb);
1469 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 _remove_lock(r, lkb);
1474 /* returns: 0 did nothing
1475 1 moved lock to granted
1478 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1482 lkb->lkb_rqmode = DLM_LOCK_IV;
1484 switch (lkb->lkb_status) {
1485 case DLM_LKSTS_GRANTED:
1487 case DLM_LKSTS_CONVERT:
1488 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1491 case DLM_LKSTS_WAITING:
1493 lkb->lkb_grmode = DLM_LOCK_IV;
1494 /* this unhold undoes the original ref from create_lkb()
1495 so this leads to the lkb being freed */
1500 log_print("invalid status for revert %d", lkb->lkb_status);
1505 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1507 return revert_lock(r, lkb);
1510 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1512 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1513 lkb->lkb_grmode = lkb->lkb_rqmode;
1514 if (lkb->lkb_status)
1515 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1517 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1520 lkb->lkb_rqmode = DLM_LOCK_IV;
1521 lkb->lkb_highbast = 0;
1524 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1526 set_lvb_lock(r, lkb);
1527 _grant_lock(r, lkb);
1530 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1531 struct dlm_message *ms)
1533 set_lvb_lock_pc(r, lkb, ms);
1534 _grant_lock(r, lkb);
1537 /* called by grant_pending_locks() which means an async grant message must
1538 be sent to the requesting node in addition to granting the lock if the
1539 lkb belongs to a remote node. */
1541 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1544 if (is_master_copy(lkb))
1547 queue_cast(r, lkb, 0);
1550 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1551 change the granted/requested modes. We're munging things accordingly in
1553 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1555 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1556 compatible with other granted locks */
1558 static void munge_demoted(struct dlm_lkb *lkb)
1560 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1561 log_print("munge_demoted %x invalid modes gr %d rq %d",
1562 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1566 lkb->lkb_grmode = DLM_LOCK_NL;
1569 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1571 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1572 ms->m_type != DLM_MSG_GRANT) {
1573 log_print("munge_altmode %x invalid reply type %d",
1574 lkb->lkb_id, ms->m_type);
1578 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1579 lkb->lkb_rqmode = DLM_LOCK_PR;
1580 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1581 lkb->lkb_rqmode = DLM_LOCK_CW;
1583 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1588 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1590 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1592 if (lkb->lkb_id == first->lkb_id)
1598 /* Check if the given lkb conflicts with another lkb on the queue. */
1600 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1602 struct dlm_lkb *this;
1604 list_for_each_entry(this, head, lkb_statequeue) {
1607 if (!modes_compat(this, lkb))
1614 * "A conversion deadlock arises with a pair of lock requests in the converting
1615 * queue for one resource. The granted mode of each lock blocks the requested
1616 * mode of the other lock."
1618 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1619 * convert queue from being granted, then deadlk/demote lkb.
1622 * Granted Queue: empty
1623 * Convert Queue: NL->EX (first lock)
1624 * PR->EX (second lock)
1626 * The first lock can't be granted because of the granted mode of the second
1627 * lock and the second lock can't be granted because it's not first in the
1628 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1629 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1630 * flag set and return DEMOTED in the lksb flags.
1632 * Originally, this function detected conv-deadlk in a more limited scope:
1633 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1634 * - if lkb1 was the first entry in the queue (not just earlier), and was
1635 * blocked by the granted mode of lkb2, and there was nothing on the
1636 * granted queue preventing lkb1 from being granted immediately, i.e.
1637 * lkb2 was the only thing preventing lkb1 from being granted.
1639 * That second condition meant we'd only say there was conv-deadlk if
1640 * resolving it (by demotion) would lead to the first lock on the convert
1641 * queue being granted right away. It allowed conversion deadlocks to exist
1642 * between locks on the convert queue while they couldn't be granted anyway.
1644 * Now, we detect and take action on conversion deadlocks immediately when
1645 * they're created, even if they may not be immediately consequential. If
1646 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1647 * mode that would prevent lkb1's conversion from being granted, we do a
1648 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1649 * I think this means that the lkb_is_ahead condition below should always
1650 * be zero, i.e. there will never be conv-deadlk between two locks that are
1651 * both already on the convert queue.
1654 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1656 struct dlm_lkb *lkb1;
1657 int lkb_is_ahead = 0;
1659 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1665 if (!lkb_is_ahead) {
1666 if (!modes_compat(lkb2, lkb1))
1669 if (!modes_compat(lkb2, lkb1) &&
1670 !modes_compat(lkb1, lkb2))
1678 * Return 1 if the lock can be granted, 0 otherwise.
1679 * Also detect and resolve conversion deadlocks.
1681 * lkb is the lock to be granted
1683 * now is 1 if the function is being called in the context of the
1684 * immediate request, it is 0 if called later, after the lock has been
1687 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1690 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1692 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1695 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1696 * a new request for a NL mode lock being blocked.
1698 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1699 * request, then it would be granted. In essence, the use of this flag
1700 * tells the Lock Manager to expedite theis request by not considering
1701 * what may be in the CONVERTING or WAITING queues... As of this
1702 * writing, the EXPEDITE flag can be used only with new requests for NL
1703 * mode locks. This flag is not valid for conversion requests.
1705 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1706 * conversion or used with a non-NL requested mode. We also know an
1707 * EXPEDITE request is always granted immediately, so now must always
1708 * be 1. The full condition to grant an expedite request: (now &&
1709 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1710 * therefore be shortened to just checking the flag.
1713 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1717 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1718 * added to the remaining conditions.
1721 if (queue_conflict(&r->res_grantqueue, lkb))
1725 * 6-3: By default, a conversion request is immediately granted if the
1726 * requested mode is compatible with the modes of all other granted
1730 if (queue_conflict(&r->res_convertqueue, lkb))
1734 * 6-5: But the default algorithm for deciding whether to grant or
1735 * queue conversion requests does not by itself guarantee that such
1736 * requests are serviced on a "first come first serve" basis. This, in
1737 * turn, can lead to a phenomenon known as "indefinate postponement".
1739 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1740 * the system service employed to request a lock conversion. This flag
1741 * forces certain conversion requests to be queued, even if they are
1742 * compatible with the granted modes of other locks on the same
1743 * resource. Thus, the use of this flag results in conversion requests
1744 * being ordered on a "first come first servce" basis.
1746 * DCT: This condition is all about new conversions being able to occur
1747 * "in place" while the lock remains on the granted queue (assuming
1748 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1749 * doesn't _have_ to go onto the convert queue where it's processed in
1750 * order. The "now" variable is necessary to distinguish converts
1751 * being received and processed for the first time now, because once a
1752 * convert is moved to the conversion queue the condition below applies
1753 * requiring fifo granting.
1756 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1760 * Even if the convert is compat with all granted locks,
1761 * QUECVT forces it behind other locks on the convert queue.
1764 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
1765 if (list_empty(&r->res_convertqueue))
1772 * The NOORDER flag is set to avoid the standard vms rules on grant
1776 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1780 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1781 * granted until all other conversion requests ahead of it are granted
1785 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1789 * 6-4: By default, a new request is immediately granted only if all
1790 * three of the following conditions are satisfied when the request is
1792 * - The queue of ungranted conversion requests for the resource is
1794 * - The queue of ungranted new requests for the resource is empty.
1795 * - The mode of the new request is compatible with the most
1796 * restrictive mode of all granted locks on the resource.
1799 if (now && !conv && list_empty(&r->res_convertqueue) &&
1800 list_empty(&r->res_waitqueue))
1804 * 6-4: Once a lock request is in the queue of ungranted new requests,
1805 * it cannot be granted until the queue of ungranted conversion
1806 * requests is empty, all ungranted new requests ahead of it are
1807 * granted and/or canceled, and it is compatible with the granted mode
1808 * of the most restrictive lock granted on the resource.
1811 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1812 first_in_list(lkb, &r->res_waitqueue))
1818 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1822 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1823 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1828 rv = _can_be_granted(r, lkb, now);
1833 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1834 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1835 * cancels one of the locks.
1838 if (is_convert && can_be_queued(lkb) &&
1839 conversion_deadlock_detect(r, lkb)) {
1840 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1841 lkb->lkb_grmode = DLM_LOCK_NL;
1842 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1843 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1847 log_print("can_be_granted deadlock %x now %d",
1856 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1857 * to grant a request in a mode other than the normal rqmode. It's a
1858 * simple way to provide a big optimization to applications that can
1862 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1864 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1868 lkb->lkb_rqmode = alt;
1869 rv = _can_be_granted(r, lkb, now);
1871 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1873 lkb->lkb_rqmode = rqmode;
1879 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1880 for locks pending on the convert list. Once verified (watch for these
1881 log_prints), we should be able to just call _can_be_granted() and not
1882 bother with the demote/deadlk cases here (and there's no easy way to deal
1883 with a deadlk here, we'd have to generate something like grant_lock with
1884 the deadlk error.) */
1886 /* Returns the highest requested mode of all blocked conversions; sets
1887 cw if there's a blocked conversion to DLM_LOCK_CW. */
1889 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1890 unsigned int *count)
1892 struct dlm_lkb *lkb, *s;
1893 int hi, demoted, quit, grant_restart, demote_restart;
1902 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1903 demoted = is_demoted(lkb);
1906 if (can_be_granted(r, lkb, 0, &deadlk)) {
1907 grant_lock_pending(r, lkb);
1914 if (!demoted && is_demoted(lkb)) {
1915 log_print("WARN: pending demoted %x node %d %s",
1916 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1922 log_print("WARN: pending deadlock %x node %d %s",
1923 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1928 hi = max_t(int, lkb->lkb_rqmode, hi);
1930 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1936 if (demote_restart && !quit) {
1941 return max_t(int, high, hi);
1944 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
1945 unsigned int *count)
1947 struct dlm_lkb *lkb, *s;
1949 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1950 if (can_be_granted(r, lkb, 0, NULL)) {
1951 grant_lock_pending(r, lkb);
1955 high = max_t(int, lkb->lkb_rqmode, high);
1956 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1964 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1965 on either the convert or waiting queue.
1966 high is the largest rqmode of all locks blocked on the convert or
1969 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1971 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1972 if (gr->lkb_highbast < DLM_LOCK_EX)
1977 if (gr->lkb_highbast < high &&
1978 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1983 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
1985 struct dlm_lkb *lkb, *s;
1986 int high = DLM_LOCK_IV;
1989 if (!is_master(r)) {
1990 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
1995 high = grant_pending_convert(r, high, &cw, count);
1996 high = grant_pending_wait(r, high, &cw, count);
1998 if (high == DLM_LOCK_IV)
2002 * If there are locks left on the wait/convert queue then send blocking
2003 * ASTs to granted locks based on the largest requested mode (high)
2007 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2008 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2009 if (cw && high == DLM_LOCK_PR &&
2010 lkb->lkb_grmode == DLM_LOCK_PR)
2011 queue_bast(r, lkb, DLM_LOCK_CW);
2013 queue_bast(r, lkb, high);
2014 lkb->lkb_highbast = high;
2019 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2021 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2022 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2023 if (gr->lkb_highbast < DLM_LOCK_EX)
2028 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2033 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2034 struct dlm_lkb *lkb)
2038 list_for_each_entry(gr, head, lkb_statequeue) {
2039 /* skip self when sending basts to convertqueue */
2042 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2043 queue_bast(r, gr, lkb->lkb_rqmode);
2044 gr->lkb_highbast = lkb->lkb_rqmode;
2049 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2051 send_bast_queue(r, &r->res_grantqueue, lkb);
2054 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056 send_bast_queue(r, &r->res_grantqueue, lkb);
2057 send_bast_queue(r, &r->res_convertqueue, lkb);
2060 /* set_master(r, lkb) -- set the master nodeid of a resource
2062 The purpose of this function is to set the nodeid field in the given
2063 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2064 known, it can just be copied to the lkb and the function will return
2065 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2066 before it can be copied to the lkb.
2068 When the rsb nodeid is being looked up remotely, the initial lkb
2069 causing the lookup is kept on the ls_waiters list waiting for the
2070 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2071 on the rsb's res_lookup list until the master is verified.
2074 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2075 1: the rsb master is not available and the lkb has been placed on
2079 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2081 struct dlm_ls *ls = r->res_ls;
2082 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2084 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2085 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2086 r->res_first_lkid = lkb->lkb_id;
2087 lkb->lkb_nodeid = r->res_nodeid;
2091 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2092 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2096 if (r->res_nodeid == 0) {
2097 lkb->lkb_nodeid = 0;
2101 if (r->res_nodeid > 0) {
2102 lkb->lkb_nodeid = r->res_nodeid;
2106 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2108 dir_nodeid = dlm_dir_nodeid(r);
2110 if (dir_nodeid != our_nodeid) {
2111 r->res_first_lkid = lkb->lkb_id;
2112 send_lookup(r, lkb);
2116 for (i = 0; i < 2; i++) {
2117 /* It's possible for dlm_scand to remove an old rsb for
2118 this same resource from the toss list, us to create
2119 a new one, look up the master locally, and find it
2120 already exists just before dlm_scand does the
2121 dir_remove() on the previous rsb. */
2123 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2124 r->res_length, &ret_nodeid);
2127 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2130 if (error && error != -EEXIST)
2133 if (ret_nodeid == our_nodeid) {
2134 r->res_first_lkid = 0;
2136 lkb->lkb_nodeid = 0;
2138 r->res_first_lkid = lkb->lkb_id;
2139 r->res_nodeid = ret_nodeid;
2140 lkb->lkb_nodeid = ret_nodeid;
2145 static void process_lookup_list(struct dlm_rsb *r)
2147 struct dlm_lkb *lkb, *safe;
2149 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2150 list_del_init(&lkb->lkb_rsb_lookup);
2151 _request_lock(r, lkb);
2156 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2158 static void confirm_master(struct dlm_rsb *r, int error)
2160 struct dlm_lkb *lkb;
2162 if (!r->res_first_lkid)
2168 r->res_first_lkid = 0;
2169 process_lookup_list(r);
2175 /* the remote request failed and won't be retried (it was
2176 a NOQUEUE, or has been canceled/unlocked); make a waiting
2177 lkb the first_lkid */
2179 r->res_first_lkid = 0;
2181 if (!list_empty(&r->res_lookup)) {
2182 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2184 list_del_init(&lkb->lkb_rsb_lookup);
2185 r->res_first_lkid = lkb->lkb_id;
2186 _request_lock(r, lkb);
2191 log_error(r->res_ls, "confirm_master unknown error %d", error);
2195 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2196 int namelen, unsigned long timeout_cs,
2197 void (*ast) (void *astparam),
2199 void (*bast) (void *astparam, int mode),
2200 struct dlm_args *args)
2204 /* check for invalid arg usage */
2206 if (mode < 0 || mode > DLM_LOCK_EX)
2209 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2212 if (flags & DLM_LKF_CANCEL)
2215 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2218 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2221 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2224 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2227 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2230 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2233 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2239 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2242 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2245 /* these args will be copied to the lkb in validate_lock_args,
2246 it cannot be done now because when converting locks, fields in
2247 an active lkb cannot be modified before locking the rsb */
2249 args->flags = flags;
2251 args->astparam = astparam;
2252 args->bastfn = bast;
2253 args->timeout = timeout_cs;
2261 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2263 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2264 DLM_LKF_FORCEUNLOCK))
2267 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2270 args->flags = flags;
2271 args->astparam = astarg;
2275 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2276 struct dlm_args *args)
2280 if (args->flags & DLM_LKF_CONVERT) {
2281 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2284 if (args->flags & DLM_LKF_QUECVT &&
2285 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2289 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2292 if (lkb->lkb_wait_type)
2295 if (is_overlap(lkb))
2299 lkb->lkb_exflags = args->flags;
2300 lkb->lkb_sbflags = 0;
2301 lkb->lkb_astfn = args->astfn;
2302 lkb->lkb_astparam = args->astparam;
2303 lkb->lkb_bastfn = args->bastfn;
2304 lkb->lkb_rqmode = args->mode;
2305 lkb->lkb_lksb = args->lksb;
2306 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2307 lkb->lkb_ownpid = (int) current->pid;
2308 lkb->lkb_timeout_cs = args->timeout;
2312 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2313 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2314 lkb->lkb_status, lkb->lkb_wait_type,
2315 lkb->lkb_resource->res_name);
2319 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2322 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2323 because there may be a lookup in progress and it's valid to do
2324 cancel/unlockf on it */
2326 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2328 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2331 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2332 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2337 /* an lkb may still exist even though the lock is EOL'ed due to a
2338 cancel, unlock or failed noqueue request; an app can't use these
2339 locks; return same error as if the lkid had not been found at all */
2341 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2342 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2347 /* an lkb may be waiting for an rsb lookup to complete where the
2348 lookup was initiated by another lock */
2350 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2351 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2352 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2353 list_del_init(&lkb->lkb_rsb_lookup);
2354 queue_cast(lkb->lkb_resource, lkb,
2355 args->flags & DLM_LKF_CANCEL ?
2356 -DLM_ECANCEL : -DLM_EUNLOCK);
2357 unhold_lkb(lkb); /* undoes create_lkb() */
2359 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2364 /* cancel not allowed with another cancel/unlock in progress */
2366 if (args->flags & DLM_LKF_CANCEL) {
2367 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2370 if (is_overlap(lkb))
2373 /* don't let scand try to do a cancel */
2376 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2377 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2382 /* there's nothing to cancel */
2383 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2384 !lkb->lkb_wait_type) {
2389 switch (lkb->lkb_wait_type) {
2390 case DLM_MSG_LOOKUP:
2391 case DLM_MSG_REQUEST:
2392 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2395 case DLM_MSG_UNLOCK:
2396 case DLM_MSG_CANCEL:
2399 /* add_to_waiters() will set OVERLAP_CANCEL */
2403 /* do we need to allow a force-unlock if there's a normal unlock
2404 already in progress? in what conditions could the normal unlock
2405 fail such that we'd want to send a force-unlock to be sure? */
2407 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2408 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2411 if (is_overlap_unlock(lkb))
2414 /* don't let scand try to do a cancel */
2417 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2418 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2423 switch (lkb->lkb_wait_type) {
2424 case DLM_MSG_LOOKUP:
2425 case DLM_MSG_REQUEST:
2426 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2429 case DLM_MSG_UNLOCK:
2432 /* add_to_waiters() will set OVERLAP_UNLOCK */
2436 /* normal unlock not allowed if there's any op in progress */
2438 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2442 /* an overlapping op shouldn't blow away exflags from other op */
2443 lkb->lkb_exflags |= args->flags;
2444 lkb->lkb_sbflags = 0;
2445 lkb->lkb_astparam = args->astparam;
2449 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2450 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2451 args->flags, lkb->lkb_wait_type,
2452 lkb->lkb_resource->res_name);
2457 * Four stage 4 varieties:
2458 * do_request(), do_convert(), do_unlock(), do_cancel()
2459 * These are called on the master node for the given lock and
2460 * from the central locking logic.
2463 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2467 if (can_be_granted(r, lkb, 1, NULL)) {
2469 queue_cast(r, lkb, 0);
2473 if (can_be_queued(lkb)) {
2474 error = -EINPROGRESS;
2475 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2481 queue_cast(r, lkb, -EAGAIN);
2486 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2491 if (force_blocking_asts(lkb))
2492 send_blocking_asts_all(r, lkb);
2495 send_blocking_asts(r, lkb);
2500 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2505 /* changing an existing lock may allow others to be granted */
2507 if (can_be_granted(r, lkb, 1, &deadlk)) {
2509 queue_cast(r, lkb, 0);
2513 /* can_be_granted() detected that this lock would block in a conversion
2514 deadlock, so we leave it on the granted queue and return EDEADLK in
2515 the ast for the convert. */
2518 /* it's left on the granted queue */
2519 revert_lock(r, lkb);
2520 queue_cast(r, lkb, -EDEADLK);
2525 /* is_demoted() means the can_be_granted() above set the grmode
2526 to NL, and left us on the granted queue. This auto-demotion
2527 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2528 now grantable. We have to try to grant other converting locks
2529 before we try again to grant this one. */
2531 if (is_demoted(lkb)) {
2532 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2533 if (_can_be_granted(r, lkb, 1)) {
2535 queue_cast(r, lkb, 0);
2538 /* else fall through and move to convert queue */
2541 if (can_be_queued(lkb)) {
2542 error = -EINPROGRESS;
2544 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2550 queue_cast(r, lkb, -EAGAIN);
2555 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2560 grant_pending_locks(r, NULL);
2561 /* grant_pending_locks also sends basts */
2564 if (force_blocking_asts(lkb))
2565 send_blocking_asts_all(r, lkb);
2568 send_blocking_asts(r, lkb);
2573 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2575 remove_lock(r, lkb);
2576 queue_cast(r, lkb, -DLM_EUNLOCK);
2577 return -DLM_EUNLOCK;
2580 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2583 grant_pending_locks(r, NULL);
2586 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2588 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2592 error = revert_lock(r, lkb);
2594 queue_cast(r, lkb, -DLM_ECANCEL);
2595 return -DLM_ECANCEL;
2600 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2604 grant_pending_locks(r, NULL);
2608 * Four stage 3 varieties:
2609 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2612 /* add a new lkb to a possibly new rsb, called by requesting process */
2614 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2618 /* set_master: sets lkb nodeid from r */
2620 error = set_master(r, lkb);
2629 /* receive_request() calls do_request() on remote node */
2630 error = send_request(r, lkb);
2632 error = do_request(r, lkb);
2633 /* for remote locks the request_reply is sent
2634 between do_request and do_request_effects */
2635 do_request_effects(r, lkb, error);
2641 /* change some property of an existing lkb, e.g. mode */
2643 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2648 /* receive_convert() calls do_convert() on remote node */
2649 error = send_convert(r, lkb);
2651 error = do_convert(r, lkb);
2652 /* for remote locks the convert_reply is sent
2653 between do_convert and do_convert_effects */
2654 do_convert_effects(r, lkb, error);
2660 /* remove an existing lkb from the granted queue */
2662 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2667 /* receive_unlock() calls do_unlock() on remote node */
2668 error = send_unlock(r, lkb);
2670 error = do_unlock(r, lkb);
2671 /* for remote locks the unlock_reply is sent
2672 between do_unlock and do_unlock_effects */
2673 do_unlock_effects(r, lkb, error);
2679 /* remove an existing lkb from the convert or wait queue */
2681 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2686 /* receive_cancel() calls do_cancel() on remote node */
2687 error = send_cancel(r, lkb);
2689 error = do_cancel(r, lkb);
2690 /* for remote locks the cancel_reply is sent
2691 between do_cancel and do_cancel_effects */
2692 do_cancel_effects(r, lkb, error);
2699 * Four stage 2 varieties:
2700 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2703 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2704 int len, struct dlm_args *args)
2709 error = validate_lock_args(ls, lkb, args);
2713 error = find_rsb(ls, name, len, R_CREATE, &r);
2720 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2722 error = _request_lock(r, lkb);
2731 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2732 struct dlm_args *args)
2737 r = lkb->lkb_resource;
2742 error = validate_lock_args(ls, lkb, args);
2746 error = _convert_lock(r, lkb);
2753 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2754 struct dlm_args *args)
2759 r = lkb->lkb_resource;
2764 error = validate_unlock_args(lkb, args);
2768 error = _unlock_lock(r, lkb);
2775 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2776 struct dlm_args *args)
2781 r = lkb->lkb_resource;
2786 error = validate_unlock_args(lkb, args);
2790 error = _cancel_lock(r, lkb);
2798 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2801 int dlm_lock(dlm_lockspace_t *lockspace,
2803 struct dlm_lksb *lksb,
2806 unsigned int namelen,
2807 uint32_t parent_lkid,
2808 void (*ast) (void *astarg),
2810 void (*bast) (void *astarg, int mode))
2813 struct dlm_lkb *lkb;
2814 struct dlm_args args;
2815 int error, convert = flags & DLM_LKF_CONVERT;
2817 ls = dlm_find_lockspace_local(lockspace);
2821 dlm_lock_recovery(ls);
2824 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2826 error = create_lkb(ls, &lkb);
2831 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2832 astarg, bast, &args);
2837 error = convert_lock(ls, lkb, &args);
2839 error = request_lock(ls, lkb, name, namelen, &args);
2841 if (error == -EINPROGRESS)
2844 if (convert || error)
2846 if (error == -EAGAIN || error == -EDEADLK)
2849 dlm_unlock_recovery(ls);
2850 dlm_put_lockspace(ls);
2854 int dlm_unlock(dlm_lockspace_t *lockspace,
2857 struct dlm_lksb *lksb,
2861 struct dlm_lkb *lkb;
2862 struct dlm_args args;
2865 ls = dlm_find_lockspace_local(lockspace);
2869 dlm_lock_recovery(ls);
2871 error = find_lkb(ls, lkid, &lkb);
2875 error = set_unlock_args(flags, astarg, &args);
2879 if (flags & DLM_LKF_CANCEL)
2880 error = cancel_lock(ls, lkb, &args);
2882 error = unlock_lock(ls, lkb, &args);
2884 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2886 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2891 dlm_unlock_recovery(ls);
2892 dlm_put_lockspace(ls);
2897 * send/receive routines for remote operations and replies
2901 * send_request receive_request
2902 * send_convert receive_convert
2903 * send_unlock receive_unlock
2904 * send_cancel receive_cancel
2905 * send_grant receive_grant
2906 * send_bast receive_bast
2907 * send_lookup receive_lookup
2908 * send_remove receive_remove
2911 * receive_request_reply send_request_reply
2912 * receive_convert_reply send_convert_reply
2913 * receive_unlock_reply send_unlock_reply
2914 * receive_cancel_reply send_cancel_reply
2915 * receive_lookup_reply send_lookup_reply
2918 static int _create_message(struct dlm_ls *ls, int mb_len,
2919 int to_nodeid, int mstype,
2920 struct dlm_message **ms_ret,
2921 struct dlm_mhandle **mh_ret)
2923 struct dlm_message *ms;
2924 struct dlm_mhandle *mh;
2927 /* get_buffer gives us a message handle (mh) that we need to
2928 pass into lowcomms_commit and a message buffer (mb) that we
2929 write our data into */
2931 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2935 memset(mb, 0, mb_len);
2937 ms = (struct dlm_message *) mb;
2939 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2940 ms->m_header.h_lockspace = ls->ls_global_id;
2941 ms->m_header.h_nodeid = dlm_our_nodeid();
2942 ms->m_header.h_length = mb_len;
2943 ms->m_header.h_cmd = DLM_MSG;
2945 ms->m_type = mstype;
2952 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2953 int to_nodeid, int mstype,
2954 struct dlm_message **ms_ret,
2955 struct dlm_mhandle **mh_ret)
2957 int mb_len = sizeof(struct dlm_message);
2960 case DLM_MSG_REQUEST:
2961 case DLM_MSG_LOOKUP:
2962 case DLM_MSG_REMOVE:
2963 mb_len += r->res_length;
2965 case DLM_MSG_CONVERT:
2966 case DLM_MSG_UNLOCK:
2967 case DLM_MSG_REQUEST_REPLY:
2968 case DLM_MSG_CONVERT_REPLY:
2970 if (lkb && lkb->lkb_lvbptr)
2971 mb_len += r->res_ls->ls_lvblen;
2975 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2979 /* further lowcomms enhancements or alternate implementations may make
2980 the return value from this function useful at some point */
2982 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2984 dlm_message_out(ms);
2985 dlm_lowcomms_commit_buffer(mh);
2989 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2990 struct dlm_message *ms)
2992 ms->m_nodeid = lkb->lkb_nodeid;
2993 ms->m_pid = lkb->lkb_ownpid;
2994 ms->m_lkid = lkb->lkb_id;
2995 ms->m_remid = lkb->lkb_remid;
2996 ms->m_exflags = lkb->lkb_exflags;
2997 ms->m_sbflags = lkb->lkb_sbflags;
2998 ms->m_flags = lkb->lkb_flags;
2999 ms->m_lvbseq = lkb->lkb_lvbseq;
3000 ms->m_status = lkb->lkb_status;
3001 ms->m_grmode = lkb->lkb_grmode;
3002 ms->m_rqmode = lkb->lkb_rqmode;
3003 ms->m_hash = r->res_hash;
3005 /* m_result and m_bastmode are set from function args,
3006 not from lkb fields */
3008 if (lkb->lkb_bastfn)
3009 ms->m_asts |= DLM_CB_BAST;
3011 ms->m_asts |= DLM_CB_CAST;
3013 /* compare with switch in create_message; send_remove() doesn't
3016 switch (ms->m_type) {
3017 case DLM_MSG_REQUEST:
3018 case DLM_MSG_LOOKUP:
3019 memcpy(ms->m_extra, r->res_name, r->res_length);
3021 case DLM_MSG_CONVERT:
3022 case DLM_MSG_UNLOCK:
3023 case DLM_MSG_REQUEST_REPLY:
3024 case DLM_MSG_CONVERT_REPLY:
3026 if (!lkb->lkb_lvbptr)
3028 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3033 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3035 struct dlm_message *ms;
3036 struct dlm_mhandle *mh;
3037 int to_nodeid, error;
3039 to_nodeid = r->res_nodeid;
3041 error = add_to_waiters(lkb, mstype, to_nodeid);
3045 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3049 send_args(r, lkb, ms);
3051 error = send_message(mh, ms);
3057 remove_from_waiters(lkb, msg_reply_type(mstype));
3061 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3063 return send_common(r, lkb, DLM_MSG_REQUEST);
3066 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3070 error = send_common(r, lkb, DLM_MSG_CONVERT);
3072 /* down conversions go without a reply from the master */
3073 if (!error && down_conversion(lkb)) {
3074 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3075 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3076 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3077 r->res_ls->ls_stub_ms.m_result = 0;
3078 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3084 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3085 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3086 that the master is still correct. */
3088 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3090 return send_common(r, lkb, DLM_MSG_UNLOCK);
3093 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3095 return send_common(r, lkb, DLM_MSG_CANCEL);
3098 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3100 struct dlm_message *ms;
3101 struct dlm_mhandle *mh;
3102 int to_nodeid, error;
3104 to_nodeid = lkb->lkb_nodeid;
3106 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3110 send_args(r, lkb, ms);
3114 error = send_message(mh, ms);
3119 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3121 struct dlm_message *ms;
3122 struct dlm_mhandle *mh;
3123 int to_nodeid, error;
3125 to_nodeid = lkb->lkb_nodeid;
3127 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3131 send_args(r, lkb, ms);
3133 ms->m_bastmode = mode;
3135 error = send_message(mh, ms);
3140 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3142 struct dlm_message *ms;
3143 struct dlm_mhandle *mh;
3144 int to_nodeid, error;
3146 to_nodeid = dlm_dir_nodeid(r);
3148 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3152 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3156 send_args(r, lkb, ms);
3158 error = send_message(mh, ms);
3164 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3168 static int send_remove(struct dlm_rsb *r)
3170 struct dlm_message *ms;
3171 struct dlm_mhandle *mh;
3172 int to_nodeid, error;
3174 to_nodeid = dlm_dir_nodeid(r);
3176 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3180 memcpy(ms->m_extra, r->res_name, r->res_length);
3181 ms->m_hash = r->res_hash;
3183 error = send_message(mh, ms);
3188 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3191 struct dlm_message *ms;
3192 struct dlm_mhandle *mh;
3193 int to_nodeid, error;
3195 to_nodeid = lkb->lkb_nodeid;
3197 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3201 send_args(r, lkb, ms);
3205 error = send_message(mh, ms);
3210 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3212 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3215 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3217 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3220 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3222 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3225 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3227 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3230 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3231 int ret_nodeid, int rv)
3233 struct dlm_rsb *r = &ls->ls_stub_rsb;
3234 struct dlm_message *ms;
3235 struct dlm_mhandle *mh;
3236 int error, nodeid = ms_in->m_header.h_nodeid;
3238 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3242 ms->m_lkid = ms_in->m_lkid;
3244 ms->m_nodeid = ret_nodeid;
3246 error = send_message(mh, ms);
3251 /* which args we save from a received message depends heavily on the type
3252 of message, unlike the send side where we can safely send everything about
3253 the lkb for any type of message */
3255 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3257 lkb->lkb_exflags = ms->m_exflags;
3258 lkb->lkb_sbflags = ms->m_sbflags;
3259 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3260 (ms->m_flags & 0x0000FFFF);
3263 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3265 if (ms->m_flags == DLM_IFL_STUB_MS)
3268 lkb->lkb_sbflags = ms->m_sbflags;
3269 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3270 (ms->m_flags & 0x0000FFFF);
3273 static int receive_extralen(struct dlm_message *ms)
3275 return (ms->m_header.h_length - sizeof(struct dlm_message));
3278 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3279 struct dlm_message *ms)
3283 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3284 if (!lkb->lkb_lvbptr)
3285 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3286 if (!lkb->lkb_lvbptr)
3288 len = receive_extralen(ms);
3289 if (len > DLM_RESNAME_MAXLEN)
3290 len = DLM_RESNAME_MAXLEN;
3291 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3296 static void fake_bastfn(void *astparam, int mode)
3298 log_print("fake_bastfn should not be called");
3301 static void fake_astfn(void *astparam)
3303 log_print("fake_astfn should not be called");
3306 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3307 struct dlm_message *ms)
3309 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3310 lkb->lkb_ownpid = ms->m_pid;
3311 lkb->lkb_remid = ms->m_lkid;
3312 lkb->lkb_grmode = DLM_LOCK_IV;
3313 lkb->lkb_rqmode = ms->m_rqmode;
3315 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3316 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3318 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3319 /* lkb was just created so there won't be an lvb yet */
3320 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3321 if (!lkb->lkb_lvbptr)
3328 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3329 struct dlm_message *ms)
3331 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3334 if (receive_lvb(ls, lkb, ms))
3337 lkb->lkb_rqmode = ms->m_rqmode;
3338 lkb->lkb_lvbseq = ms->m_lvbseq;
3343 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3344 struct dlm_message *ms)
3346 if (receive_lvb(ls, lkb, ms))
3351 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3352 uses to send a reply and that the remote end uses to process the reply. */
3354 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3356 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3357 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3358 lkb->lkb_remid = ms->m_lkid;
3361 /* This is called after the rsb is locked so that we can safely inspect
3362 fields in the lkb. */
3364 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3366 int from = ms->m_header.h_nodeid;
3369 switch (ms->m_type) {
3370 case DLM_MSG_CONVERT:
3371 case DLM_MSG_UNLOCK:
3372 case DLM_MSG_CANCEL:
3373 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3377 case DLM_MSG_CONVERT_REPLY:
3378 case DLM_MSG_UNLOCK_REPLY:
3379 case DLM_MSG_CANCEL_REPLY:
3382 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3386 case DLM_MSG_REQUEST_REPLY:
3387 if (!is_process_copy(lkb))
3389 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3398 log_error(lkb->lkb_resource->res_ls,
3399 "ignore invalid message %d from %d %x %x %x %d",
3400 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3401 lkb->lkb_flags, lkb->lkb_nodeid);
3405 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3407 struct dlm_lkb *lkb;
3411 error = create_lkb(ls, &lkb);
3415 receive_flags(lkb, ms);
3416 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3417 error = receive_request_args(ls, lkb, ms);
3423 namelen = receive_extralen(ms);
3425 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3434 error = do_request(r, lkb);
3435 send_request_reply(r, lkb, error);
3436 do_request_effects(r, lkb, error);
3441 if (error == -EINPROGRESS)
3448 setup_stub_lkb(ls, ms);
3449 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3453 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3455 struct dlm_lkb *lkb;
3457 int error, reply = 1;
3459 error = find_lkb(ls, ms->m_remid, &lkb);
3463 if (lkb->lkb_remid != ms->m_lkid) {
3464 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3465 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3466 (unsigned long long)lkb->lkb_recover_seq,
3467 ms->m_header.h_nodeid, ms->m_lkid);
3472 r = lkb->lkb_resource;
3477 error = validate_message(lkb, ms);
3481 receive_flags(lkb, ms);
3483 error = receive_convert_args(ls, lkb, ms);
3485 send_convert_reply(r, lkb, error);
3489 reply = !down_conversion(lkb);
3491 error = do_convert(r, lkb);
3493 send_convert_reply(r, lkb, error);
3494 do_convert_effects(r, lkb, error);
3502 setup_stub_lkb(ls, ms);
3503 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3507 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3509 struct dlm_lkb *lkb;
3513 error = find_lkb(ls, ms->m_remid, &lkb);
3517 if (lkb->lkb_remid != ms->m_lkid) {
3518 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3519 lkb->lkb_id, lkb->lkb_remid,
3520 ms->m_header.h_nodeid, ms->m_lkid);
3525 r = lkb->lkb_resource;
3530 error = validate_message(lkb, ms);
3534 receive_flags(lkb, ms);
3536 error = receive_unlock_args(ls, lkb, ms);
3538 send_unlock_reply(r, lkb, error);
3542 error = do_unlock(r, lkb);
3543 send_unlock_reply(r, lkb, error);
3544 do_unlock_effects(r, lkb, error);
3552 setup_stub_lkb(ls, ms);
3553 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3557 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3559 struct dlm_lkb *lkb;
3563 error = find_lkb(ls, ms->m_remid, &lkb);
3567 receive_flags(lkb, ms);
3569 r = lkb->lkb_resource;
3574 error = validate_message(lkb, ms);
3578 error = do_cancel(r, lkb);
3579 send_cancel_reply(r, lkb, error);
3580 do_cancel_effects(r, lkb, error);
3588 setup_stub_lkb(ls, ms);
3589 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3593 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3595 struct dlm_lkb *lkb;
3599 error = find_lkb(ls, ms->m_remid, &lkb);
3603 r = lkb->lkb_resource;
3608 error = validate_message(lkb, ms);
3612 receive_flags_reply(lkb, ms);
3613 if (is_altmode(lkb))
3614 munge_altmode(lkb, ms);
3615 grant_lock_pc(r, lkb, ms);
3616 queue_cast(r, lkb, 0);
3624 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3626 struct dlm_lkb *lkb;
3630 error = find_lkb(ls, ms->m_remid, &lkb);
3634 r = lkb->lkb_resource;
3639 error = validate_message(lkb, ms);
3643 queue_bast(r, lkb, ms->m_bastmode);
3644 lkb->lkb_highbast = ms->m_bastmode;
3652 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3654 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3656 from_nodeid = ms->m_header.h_nodeid;
3657 our_nodeid = dlm_our_nodeid();
3659 len = receive_extralen(ms);
3661 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3662 if (dir_nodeid != our_nodeid) {
3663 log_error(ls, "lookup dir_nodeid %d from %d",
3664 dir_nodeid, from_nodeid);
3670 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3672 /* Optimization: we're master so treat lookup as a request */
3673 if (!error && ret_nodeid == our_nodeid) {
3674 receive_request(ls, ms);
3678 send_lookup_reply(ls, ms, ret_nodeid, error);
3681 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3683 int len, dir_nodeid, from_nodeid;
3685 from_nodeid = ms->m_header.h_nodeid;
3687 len = receive_extralen(ms);
3689 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3690 if (dir_nodeid != dlm_our_nodeid()) {
3691 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3692 dir_nodeid, from_nodeid);
3696 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3699 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3701 do_purge(ls, ms->m_nodeid, ms->m_pid);
3704 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3706 struct dlm_lkb *lkb;
3708 int error, mstype, result;
3710 error = find_lkb(ls, ms->m_remid, &lkb);
3714 r = lkb->lkb_resource;
3718 error = validate_message(lkb, ms);
3722 mstype = lkb->lkb_wait_type;
3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3725 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3732 /* Optimization: the dir node was also the master, so it took our
3733 lookup as a request and sent request reply instead of lookup reply */
3734 if (mstype == DLM_MSG_LOOKUP) {
3735 r->res_nodeid = ms->m_header.h_nodeid;
3736 lkb->lkb_nodeid = r->res_nodeid;
3739 /* this is the value returned from do_request() on the master */
3740 result = ms->m_result;
3744 /* request would block (be queued) on remote master */
3745 queue_cast(r, lkb, -EAGAIN);
3746 confirm_master(r, -EAGAIN);
3747 unhold_lkb(lkb); /* undoes create_lkb() */
3752 /* request was queued or granted on remote master */
3753 receive_flags_reply(lkb, ms);
3754 lkb->lkb_remid = ms->m_lkid;
3755 if (is_altmode(lkb))
3756 munge_altmode(lkb, ms);
3758 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3761 grant_lock_pc(r, lkb, ms);
3762 queue_cast(r, lkb, 0);
3764 confirm_master(r, result);
3769 /* find_rsb failed to find rsb or rsb wasn't master */
3770 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3771 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3773 lkb->lkb_nodeid = -1;
3775 if (is_overlap(lkb)) {
3776 /* we'll ignore error in cancel/unlock reply */
3777 queue_cast_overlap(r, lkb);
3778 confirm_master(r, result);
3779 unhold_lkb(lkb); /* undoes create_lkb() */
3781 _request_lock(r, lkb);
3785 log_error(ls, "receive_request_reply %x error %d",
3786 lkb->lkb_id, result);
3789 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3790 log_debug(ls, "receive_request_reply %x result %d unlock",
3791 lkb->lkb_id, result);
3792 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3793 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3794 send_unlock(r, lkb);
3795 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3796 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3797 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3798 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3799 send_cancel(r, lkb);
3801 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3802 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3811 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3812 struct dlm_message *ms)
3814 /* this is the value returned from do_convert() on the master */
3815 switch (ms->m_result) {
3817 /* convert would block (be queued) on remote master */
3818 queue_cast(r, lkb, -EAGAIN);
3822 receive_flags_reply(lkb, ms);
3823 revert_lock_pc(r, lkb);
3824 queue_cast(r, lkb, -EDEADLK);
3828 /* convert was queued on remote master */
3829 receive_flags_reply(lkb, ms);
3830 if (is_demoted(lkb))
3833 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3838 /* convert was granted on remote master */
3839 receive_flags_reply(lkb, ms);
3840 if (is_demoted(lkb))
3842 grant_lock_pc(r, lkb, ms);
3843 queue_cast(r, lkb, 0);
3847 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
3848 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3855 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3857 struct dlm_rsb *r = lkb->lkb_resource;
3863 error = validate_message(lkb, ms);
3867 /* stub reply can happen with waiters_mutex held */
3868 error = remove_from_waiters_ms(lkb, ms);
3872 __receive_convert_reply(r, lkb, ms);
3878 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3880 struct dlm_lkb *lkb;
3883 error = find_lkb(ls, ms->m_remid, &lkb);
3887 _receive_convert_reply(lkb, ms);
3892 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3894 struct dlm_rsb *r = lkb->lkb_resource;
3900 error = validate_message(lkb, ms);
3904 /* stub reply can happen with waiters_mutex held */
3905 error = remove_from_waiters_ms(lkb, ms);
3909 /* this is the value returned from do_unlock() on the master */
3911 switch (ms->m_result) {
3913 receive_flags_reply(lkb, ms);
3914 remove_lock_pc(r, lkb);
3915 queue_cast(r, lkb, -DLM_EUNLOCK);
3920 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3921 lkb->lkb_id, ms->m_result);
3928 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3930 struct dlm_lkb *lkb;
3933 error = find_lkb(ls, ms->m_remid, &lkb);
3937 _receive_unlock_reply(lkb, ms);
3942 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3944 struct dlm_rsb *r = lkb->lkb_resource;
3950 error = validate_message(lkb, ms);
3954 /* stub reply can happen with waiters_mutex held */
3955 error = remove_from_waiters_ms(lkb, ms);
3959 /* this is the value returned from do_cancel() on the master */
3961 switch (ms->m_result) {
3963 receive_flags_reply(lkb, ms);
3964 revert_lock_pc(r, lkb);
3965 queue_cast(r, lkb, -DLM_ECANCEL);
3970 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3971 lkb->lkb_id, ms->m_result);
3978 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3980 struct dlm_lkb *lkb;
3983 error = find_lkb(ls, ms->m_remid, &lkb);
3987 _receive_cancel_reply(lkb, ms);
3992 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3994 struct dlm_lkb *lkb;
3996 int error, ret_nodeid;
3998 error = find_lkb(ls, ms->m_lkid, &lkb);
4000 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4004 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
4005 FIXME: will a non-zero error ever be returned? */
4007 r = lkb->lkb_resource;
4011 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4015 ret_nodeid = ms->m_nodeid;
4016 if (ret_nodeid == dlm_our_nodeid()) {
4019 r->res_first_lkid = 0;
4021 /* set_master() will copy res_nodeid to lkb_nodeid */
4022 r->res_nodeid = ret_nodeid;
4025 if (is_overlap(lkb)) {
4026 log_debug(ls, "receive_lookup_reply %x unlock %x",
4027 lkb->lkb_id, lkb->lkb_flags);
4028 queue_cast_overlap(r, lkb);
4029 unhold_lkb(lkb); /* undoes create_lkb() */
4033 _request_lock(r, lkb);
4037 process_lookup_list(r);
4044 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4047 int error = 0, noent = 0;
4049 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4050 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4051 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4052 ms->m_remid, ms->m_result);
4056 switch (ms->m_type) {
4058 /* messages sent to a master node */
4060 case DLM_MSG_REQUEST:
4061 error = receive_request(ls, ms);
4064 case DLM_MSG_CONVERT:
4065 error = receive_convert(ls, ms);
4068 case DLM_MSG_UNLOCK:
4069 error = receive_unlock(ls, ms);
4072 case DLM_MSG_CANCEL:
4074 error = receive_cancel(ls, ms);
4077 /* messages sent from a master node (replies to above) */
4079 case DLM_MSG_REQUEST_REPLY:
4080 error = receive_request_reply(ls, ms);
4083 case DLM_MSG_CONVERT_REPLY:
4084 error = receive_convert_reply(ls, ms);
4087 case DLM_MSG_UNLOCK_REPLY:
4088 error = receive_unlock_reply(ls, ms);
4091 case DLM_MSG_CANCEL_REPLY:
4092 error = receive_cancel_reply(ls, ms);
4095 /* messages sent from a master node (only two types of async msg) */
4099 error = receive_grant(ls, ms);
4104 error = receive_bast(ls, ms);
4107 /* messages sent to a dir node */
4109 case DLM_MSG_LOOKUP:
4110 receive_lookup(ls, ms);
4113 case DLM_MSG_REMOVE:
4114 receive_remove(ls, ms);
4117 /* messages sent from a dir node (remove has no reply) */
4119 case DLM_MSG_LOOKUP_REPLY:
4120 receive_lookup_reply(ls, ms);
4123 /* other messages */
4126 receive_purge(ls, ms);
4130 log_error(ls, "unknown message type %d", ms->m_type);
4134 * When checking for ENOENT, we're checking the result of
4135 * find_lkb(m_remid):
4137 * The lock id referenced in the message wasn't found. This may
4138 * happen in normal usage for the async messages and cancel, so
4139 * only use log_debug for them.
4141 * Some errors are expected and normal.
4144 if (error == -ENOENT && noent) {
4145 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4146 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4147 ms->m_lkid, saved_seq);
4148 } else if (error == -ENOENT) {
4149 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4150 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4151 ms->m_lkid, saved_seq);
4153 if (ms->m_type == DLM_MSG_CONVERT)
4154 dlm_dump_rsb_hash(ls, ms->m_hash);
4157 if (error == -EINVAL) {
4158 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4160 ms->m_type, ms->m_header.h_nodeid,
4161 ms->m_lkid, ms->m_remid, saved_seq);
4165 /* If the lockspace is in recovery mode (locking stopped), then normal
4166 messages are saved on the requestqueue for processing after recovery is
4167 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4168 messages off the requestqueue before we process new ones. This occurs right
4169 after recovery completes when we transition from saving all messages on
4170 requestqueue, to processing all the saved messages, to processing new
4171 messages as they arrive. */
4173 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4176 if (dlm_locking_stopped(ls)) {
4177 dlm_add_requestqueue(ls, nodeid, ms);
4179 dlm_wait_requestqueue(ls);
4180 _receive_message(ls, ms, 0);
4184 /* This is called by dlm_recoverd to process messages that were saved on
4185 the requestqueue. */
4187 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4190 _receive_message(ls, ms, saved_seq);
4193 /* This is called by the midcomms layer when something is received for
4194 the lockspace. It could be either a MSG (normal message sent as part of
4195 standard locking activity) or an RCOM (recovery message sent as part of
4196 lockspace recovery). */
4198 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4200 struct dlm_header *hd = &p->header;
4204 switch (hd->h_cmd) {
4206 dlm_message_in(&p->message);
4207 type = p->message.m_type;
4210 dlm_rcom_in(&p->rcom);
4211 type = p->rcom.rc_type;
4214 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4218 if (hd->h_nodeid != nodeid) {
4219 log_print("invalid h_nodeid %d from %d lockspace %x",
4220 hd->h_nodeid, nodeid, hd->h_lockspace);
4224 ls = dlm_find_lockspace_global(hd->h_lockspace);
4226 if (dlm_config.ci_log_debug) {
4227 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4228 "%u from %d cmd %d type %d\n",
4229 hd->h_lockspace, nodeid, hd->h_cmd, type);
4232 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4233 dlm_send_ls_not_ready(nodeid, &p->rcom);
4237 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4238 be inactive (in this ls) before transitioning to recovery mode */
4240 down_read(&ls->ls_recv_active);
4241 if (hd->h_cmd == DLM_MSG)
4242 dlm_receive_message(ls, &p->message, nodeid);
4244 dlm_receive_rcom(ls, &p->rcom, nodeid);
4245 up_read(&ls->ls_recv_active);
4247 dlm_put_lockspace(ls);
4250 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4251 struct dlm_message *ms_stub)
4253 if (middle_conversion(lkb)) {
4255 memset(ms_stub, 0, sizeof(struct dlm_message));
4256 ms_stub->m_flags = DLM_IFL_STUB_MS;
4257 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4258 ms_stub->m_result = -EINPROGRESS;
4259 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4260 _receive_convert_reply(lkb, ms_stub);
4262 /* Same special case as in receive_rcom_lock_args() */
4263 lkb->lkb_grmode = DLM_LOCK_IV;
4264 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4267 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4268 lkb->lkb_flags |= DLM_IFL_RESEND;
4271 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4272 conversions are async; there's no reply from the remote master */
4275 /* A waiting lkb needs recovery if the master node has failed, or
4276 the master node is changing (only when no directory is used) */
4278 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4281 if (dlm_no_directory(ls))
4284 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4290 /* Recovery for locks that are waiting for replies from nodes that are now
4291 gone. We can just complete unlocks and cancels by faking a reply from the
4292 dead node. Requests and up-conversions we flag to be resent after
4293 recovery. Down-conversions can just be completed with a fake reply like
4294 unlocks. Conversions between PR and CW need special attention. */
4296 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4298 struct dlm_lkb *lkb, *safe;
4299 struct dlm_message *ms_stub;
4300 int wait_type, stub_unlock_result, stub_cancel_result;
4303 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4305 log_error(ls, "dlm_recover_waiters_pre no mem");
4309 mutex_lock(&ls->ls_waiters_mutex);
4311 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4313 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4315 /* exclude debug messages about unlocks because there can be so
4316 many and they aren't very interesting */
4318 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4319 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4320 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4324 lkb->lkb_resource->res_nodeid,
4326 lkb->lkb_wait_nodeid,
4330 /* all outstanding lookups, regardless of destination will be
4331 resent after recovery is done */
4333 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4334 lkb->lkb_flags |= DLM_IFL_RESEND;
4338 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4341 wait_type = lkb->lkb_wait_type;
4342 stub_unlock_result = -DLM_EUNLOCK;
4343 stub_cancel_result = -DLM_ECANCEL;
4345 /* Main reply may have been received leaving a zero wait_type,
4346 but a reply for the overlapping op may not have been
4347 received. In that case we need to fake the appropriate
4348 reply for the overlap op. */
4351 if (is_overlap_cancel(lkb)) {
4352 wait_type = DLM_MSG_CANCEL;
4353 if (lkb->lkb_grmode == DLM_LOCK_IV)
4354 stub_cancel_result = 0;
4356 if (is_overlap_unlock(lkb)) {
4357 wait_type = DLM_MSG_UNLOCK;
4358 if (lkb->lkb_grmode == DLM_LOCK_IV)
4359 stub_unlock_result = -ENOENT;
4362 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4363 lkb->lkb_id, lkb->lkb_flags, wait_type,
4364 stub_cancel_result, stub_unlock_result);
4367 switch (wait_type) {
4369 case DLM_MSG_REQUEST:
4370 lkb->lkb_flags |= DLM_IFL_RESEND;
4373 case DLM_MSG_CONVERT:
4374 recover_convert_waiter(ls, lkb, ms_stub);
4377 case DLM_MSG_UNLOCK:
4379 memset(ms_stub, 0, sizeof(struct dlm_message));
4380 ms_stub->m_flags = DLM_IFL_STUB_MS;
4381 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4382 ms_stub->m_result = stub_unlock_result;
4383 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4384 _receive_unlock_reply(lkb, ms_stub);
4388 case DLM_MSG_CANCEL:
4390 memset(ms_stub, 0, sizeof(struct dlm_message));
4391 ms_stub->m_flags = DLM_IFL_STUB_MS;
4392 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4393 ms_stub->m_result = stub_cancel_result;
4394 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4395 _receive_cancel_reply(lkb, ms_stub);
4400 log_error(ls, "invalid lkb wait_type %d %d",
4401 lkb->lkb_wait_type, wait_type);
4405 mutex_unlock(&ls->ls_waiters_mutex);
4409 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4411 struct dlm_lkb *lkb;
4414 mutex_lock(&ls->ls_waiters_mutex);
4415 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4416 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4422 mutex_unlock(&ls->ls_waiters_mutex);
4429 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4430 master or dir-node for r. Processing the lkb may result in it being placed
4433 /* We do this after normal locking has been enabled and any saved messages
4434 (in requestqueue) have been processed. We should be confident that at
4435 this point we won't get or process a reply to any of these waiting
4436 operations. But, new ops may be coming in on the rsbs/locks here from
4437 userspace or remotely. */
4439 /* there may have been an overlap unlock/cancel prior to recovery or after
4440 recovery. if before, the lkb may still have a pos wait_count; if after, the
4441 overlap flag would just have been set and nothing new sent. we can be
4442 confident here than any replies to either the initial op or overlap ops
4443 prior to recovery have been received. */
4445 int dlm_recover_waiters_post(struct dlm_ls *ls)
4447 struct dlm_lkb *lkb;
4449 int error = 0, mstype, err, oc, ou;
4452 if (dlm_locking_stopped(ls)) {
4453 log_debug(ls, "recover_waiters_post aborted");
4458 lkb = find_resend_waiter(ls);
4462 r = lkb->lkb_resource;
4466 mstype = lkb->lkb_wait_type;
4467 oc = is_overlap_cancel(lkb);
4468 ou = is_overlap_unlock(lkb);
4471 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4472 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
4473 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
4474 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
4475 dlm_dir_nodeid(r), oc, ou);
4477 /* At this point we assume that we won't get a reply to any
4478 previous op or overlap op on this lock. First, do a big
4479 remove_from_waiters() for all previous ops. */
4481 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4482 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4483 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4484 lkb->lkb_wait_type = 0;
4485 lkb->lkb_wait_count = 0;
4486 mutex_lock(&ls->ls_waiters_mutex);
4487 list_del_init(&lkb->lkb_wait_reply);
4488 mutex_unlock(&ls->ls_waiters_mutex);
4489 unhold_lkb(lkb); /* for waiters list */
4492 /* do an unlock or cancel instead of resending */
4494 case DLM_MSG_LOOKUP:
4495 case DLM_MSG_REQUEST:
4496 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4498 unhold_lkb(lkb); /* undoes create_lkb() */
4500 case DLM_MSG_CONVERT:
4502 queue_cast(r, lkb, -DLM_ECANCEL);
4504 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4505 _unlock_lock(r, lkb);
4513 case DLM_MSG_LOOKUP:
4514 case DLM_MSG_REQUEST:
4515 _request_lock(r, lkb);
4517 confirm_master(r, 0);
4519 case DLM_MSG_CONVERT:
4520 _convert_lock(r, lkb);
4528 log_error(ls, "waiter %x msg %d r_nodeid %d "
4529 "dir_nodeid %d overlap %d %d",
4530 lkb->lkb_id, mstype, r->res_nodeid,
4531 dlm_dir_nodeid(r), oc, ou);
4541 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
4542 struct list_head *list)
4544 struct dlm_lkb *lkb, *safe;
4546 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4547 if (!is_master_copy(lkb))
4550 /* don't purge lkbs we've added in recover_master_copy for
4551 the current recovery seq */
4553 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
4558 /* this put should free the lkb */
4559 if (!dlm_put_lkb(lkb))
4560 log_error(ls, "purged mstcpy lkb not released");
4564 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4566 struct dlm_ls *ls = r->res_ls;
4568 purge_mstcpy_list(ls, r, &r->res_grantqueue);
4569 purge_mstcpy_list(ls, r, &r->res_convertqueue);
4570 purge_mstcpy_list(ls, r, &r->res_waitqueue);
4573 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
4574 struct list_head *list,
4575 int nodeid_gone, unsigned int *count)
4577 struct dlm_lkb *lkb, *safe;
4579 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4580 if (!is_master_copy(lkb))
4583 if ((lkb->lkb_nodeid == nodeid_gone) ||
4584 dlm_is_removed(ls, lkb->lkb_nodeid)) {
4588 /* this put should free the lkb */
4589 if (!dlm_put_lkb(lkb))
4590 log_error(ls, "purged dead lkb not released");
4592 rsb_set_flag(r, RSB_RECOVER_GRANT);
4599 /* Get rid of locks held by nodes that are gone. */
4601 void dlm_recover_purge(struct dlm_ls *ls)
4604 struct dlm_member *memb;
4605 int nodes_count = 0;
4606 int nodeid_gone = 0;
4607 unsigned int lkb_count = 0;
4609 /* cache one removed nodeid to optimize the common
4610 case of a single node removed */
4612 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
4614 nodeid_gone = memb->nodeid;
4620 down_write(&ls->ls_root_sem);
4621 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4625 purge_dead_list(ls, r, &r->res_grantqueue,
4626 nodeid_gone, &lkb_count);
4627 purge_dead_list(ls, r, &r->res_convertqueue,
4628 nodeid_gone, &lkb_count);
4629 purge_dead_list(ls, r, &r->res_waitqueue,
4630 nodeid_gone, &lkb_count);
4636 up_write(&ls->ls_root_sem);
4639 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
4640 lkb_count, nodes_count);
4643 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
4648 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4649 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4650 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4652 if (!rsb_flag(r, RSB_RECOVER_GRANT))
4654 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4658 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4661 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4666 * Attempt to grant locks on resources that we are the master of.
4667 * Locks may have become grantable during recovery because locks
4668 * from departed nodes have been purged (or not rebuilt), allowing
4669 * previously blocked locks to now be granted. The subset of rsb's
4670 * we are interested in are those with lkb's on either the convert or
4673 * Simplest would be to go through each master rsb and check for non-empty
4674 * convert or waiting queues, and attempt to grant on those rsbs.
4675 * Checking the queues requires lock_rsb, though, for which we'd need
4676 * to release the rsbtbl lock. This would make iterating through all
4677 * rsb's very inefficient. So, we rely on earlier recovery routines
4678 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
4682 void dlm_recover_grant(struct dlm_ls *ls)
4686 unsigned int count = 0;
4687 unsigned int rsb_count = 0;
4688 unsigned int lkb_count = 0;
4691 r = find_grant_rsb(ls, bucket);
4693 if (bucket == ls->ls_rsbtbl_size - 1)
4701 grant_pending_locks(r, &count);
4703 confirm_master(r, 0);
4710 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
4711 lkb_count, rsb_count);
4714 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4717 struct dlm_lkb *lkb;
4719 list_for_each_entry(lkb, head, lkb_statequeue) {
4720 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4726 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4729 struct dlm_lkb *lkb;
4731 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4734 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4737 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4743 /* needs at least dlm_rcom + rcom_lock */
4744 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4745 struct dlm_rsb *r, struct dlm_rcom *rc)
4747 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4749 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4750 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4751 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4752 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4753 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4754 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4755 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4756 lkb->lkb_rqmode = rl->rl_rqmode;
4757 lkb->lkb_grmode = rl->rl_grmode;
4758 /* don't set lkb_status because add_lkb wants to itself */
4760 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4761 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4763 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4764 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4765 sizeof(struct rcom_lock);
4766 if (lvblen > ls->ls_lvblen)
4768 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4769 if (!lkb->lkb_lvbptr)
4771 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4774 /* Conversions between PR and CW (middle modes) need special handling.
4775 The real granted mode of these converting locks cannot be determined
4776 until all locks have been rebuilt on the rsb (recover_conversion) */
4778 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4779 middle_conversion(lkb)) {
4780 rl->rl_status = DLM_LKSTS_CONVERT;
4781 lkb->lkb_grmode = DLM_LOCK_IV;
4782 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4788 /* This lkb may have been recovered in a previous aborted recovery so we need
4789 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4790 If so we just send back a standard reply. If not, we create a new lkb with
4791 the given values and send back our lkid. We send back our lkid by sending
4792 back the rcom_lock struct we got but with the remid field filled in. */
4794 /* needs at least dlm_rcom + rcom_lock */
4795 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4797 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4799 struct dlm_lkb *lkb;
4803 if (rl->rl_parent_lkid) {
4804 error = -EOPNOTSUPP;
4808 remid = le32_to_cpu(rl->rl_lkid);
4810 /* In general we expect the rsb returned to be R_MASTER, but we don't
4811 have to require it. Recovery of masters on one node can overlap
4812 recovery of locks on another node, so one node can send us MSTCPY
4813 locks before we've made ourselves master of this rsb. We can still
4814 add new MSTCPY locks that we receive here without any harm; when
4815 we make ourselves master, dlm_recover_masters() won't touch the
4816 MSTCPY locks we've received early. */
4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824 rc->rc_header.h_nodeid, remid);
4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
4838 error = create_lkb(ls, &lkb);
4842 error = receive_rcom_lock_args(ls, lkb, r, rc);
4849 add_lkb(r, lkb, rl->rl_status);
4851 ls->ls_recover_locks_in++;
4853 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
4854 rsb_set_flag(r, RSB_RECOVER_GRANT);
4857 /* this is the new value returned to the lock holder for
4858 saving in its process-copy lkb */
4859 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4861 lkb->lkb_recover_seq = ls->ls_recover_seq;
4867 if (error && error != -EEXIST)
4868 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4869 rc->rc_header.h_nodeid, remid, error);
4870 rl->rl_result = cpu_to_le32(error);
4874 /* needs at least dlm_rcom + rcom_lock */
4875 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4877 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4879 struct dlm_lkb *lkb;
4880 uint32_t lkid, remid;
4883 lkid = le32_to_cpu(rl->rl_lkid);
4884 remid = le32_to_cpu(rl->rl_remid);
4885 result = le32_to_cpu(rl->rl_result);
4887 error = find_lkb(ls, lkid, &lkb);
4889 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
4890 lkid, rc->rc_header.h_nodeid, remid, result);
4894 r = lkb->lkb_resource;
4898 if (!is_process_copy(lkb)) {
4899 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4900 lkid, rc->rc_header.h_nodeid, remid, result);
4910 /* There's a chance the new master received our lock before
4911 dlm_recover_master_reply(), this wouldn't happen if we did
4912 a barrier between recover_masters and recover_locks. */
4914 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
4915 lkid, rc->rc_header.h_nodeid, remid, result);
4917 dlm_send_rcom_lock(r, lkb);
4921 lkb->lkb_remid = remid;
4924 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
4925 lkid, rc->rc_header.h_nodeid, remid, result);
4928 /* an ack for dlm_recover_locks() which waits for replies from
4929 all the locks it sends to new masters */
4930 dlm_recovered_lock(r);
4939 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4940 int mode, uint32_t flags, void *name, unsigned int namelen,
4941 unsigned long timeout_cs)
4943 struct dlm_lkb *lkb;
4944 struct dlm_args args;
4947 dlm_lock_recovery(ls);
4949 error = create_lkb(ls, &lkb);
4955 if (flags & DLM_LKF_VALBLK) {
4956 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4957 if (!ua->lksb.sb_lvbptr) {
4965 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4966 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4967 lock and that lkb_astparam is the dlm_user_args structure. */
4969 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4970 fake_astfn, ua, fake_bastfn, &args);
4971 lkb->lkb_flags |= DLM_IFL_USER;
4978 error = request_lock(ls, lkb, name, namelen, &args);
4994 /* add this new lkb to the per-process list of locks */
4995 spin_lock(&ua->proc->locks_spin);
4997 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4998 spin_unlock(&ua->proc->locks_spin);
5000 dlm_unlock_recovery(ls);
5004 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5005 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5006 unsigned long timeout_cs)
5008 struct dlm_lkb *lkb;
5009 struct dlm_args args;
5010 struct dlm_user_args *ua;
5013 dlm_lock_recovery(ls);
5015 error = find_lkb(ls, lkid, &lkb);
5019 /* user can change the params on its lock when it converts it, or
5020 add an lvb that didn't exist before */
5024 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5025 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5026 if (!ua->lksb.sb_lvbptr) {
5031 if (lvb_in && ua->lksb.sb_lvbptr)
5032 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5034 ua->xid = ua_tmp->xid;
5035 ua->castparam = ua_tmp->castparam;
5036 ua->castaddr = ua_tmp->castaddr;
5037 ua->bastparam = ua_tmp->bastparam;
5038 ua->bastaddr = ua_tmp->bastaddr;
5039 ua->user_lksb = ua_tmp->user_lksb;
5041 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5042 fake_astfn, ua, fake_bastfn, &args);
5046 error = convert_lock(ls, lkb, &args);
5048 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5053 dlm_unlock_recovery(ls);
5058 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5059 uint32_t flags, uint32_t lkid, char *lvb_in)
5061 struct dlm_lkb *lkb;
5062 struct dlm_args args;
5063 struct dlm_user_args *ua;
5066 dlm_lock_recovery(ls);
5068 error = find_lkb(ls, lkid, &lkb);
5074 if (lvb_in && ua->lksb.sb_lvbptr)
5075 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5076 if (ua_tmp->castparam)
5077 ua->castparam = ua_tmp->castparam;
5078 ua->user_lksb = ua_tmp->user_lksb;
5080 error = set_unlock_args(flags, ua, &args);
5084 error = unlock_lock(ls, lkb, &args);
5086 if (error == -DLM_EUNLOCK)
5088 /* from validate_unlock_args() */
5089 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5094 spin_lock(&ua->proc->locks_spin);
5095 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5096 if (!list_empty(&lkb->lkb_ownqueue))
5097 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5098 spin_unlock(&ua->proc->locks_spin);
5102 dlm_unlock_recovery(ls);
5107 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5108 uint32_t flags, uint32_t lkid)
5110 struct dlm_lkb *lkb;
5111 struct dlm_args args;
5112 struct dlm_user_args *ua;
5115 dlm_lock_recovery(ls);
5117 error = find_lkb(ls, lkid, &lkb);
5122 if (ua_tmp->castparam)
5123 ua->castparam = ua_tmp->castparam;
5124 ua->user_lksb = ua_tmp->user_lksb;
5126 error = set_unlock_args(flags, ua, &args);
5130 error = cancel_lock(ls, lkb, &args);
5132 if (error == -DLM_ECANCEL)
5134 /* from validate_unlock_args() */
5135 if (error == -EBUSY)
5140 dlm_unlock_recovery(ls);
5145 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5147 struct dlm_lkb *lkb;
5148 struct dlm_args args;
5149 struct dlm_user_args *ua;
5153 dlm_lock_recovery(ls);
5155 error = find_lkb(ls, lkid, &lkb);
5161 error = set_unlock_args(flags, ua, &args);
5165 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5167 r = lkb->lkb_resource;
5171 error = validate_unlock_args(lkb, &args);
5174 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5176 error = _cancel_lock(r, lkb);
5181 if (error == -DLM_ECANCEL)
5183 /* from validate_unlock_args() */
5184 if (error == -EBUSY)
5189 dlm_unlock_recovery(ls);
5193 /* lkb's that are removed from the waiters list by revert are just left on the
5194 orphans list with the granted orphan locks, to be freed by purge */
5196 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5198 struct dlm_args args;
5202 mutex_lock(&ls->ls_orphans_mutex);
5203 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5204 mutex_unlock(&ls->ls_orphans_mutex);
5206 set_unlock_args(0, lkb->lkb_ua, &args);
5208 error = cancel_lock(ls, lkb, &args);
5209 if (error == -DLM_ECANCEL)
5214 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5215 Regardless of what rsb queue the lock is on, it's removed and freed. */
5217 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5219 struct dlm_args args;
5222 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5224 error = unlock_lock(ls, lkb, &args);
5225 if (error == -DLM_EUNLOCK)
5230 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5231 (which does lock_rsb) due to deadlock with receiving a message that does
5232 lock_rsb followed by dlm_user_add_cb() */
5234 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5235 struct dlm_user_proc *proc)
5237 struct dlm_lkb *lkb = NULL;
5239 mutex_lock(&ls->ls_clear_proc_locks);
5240 if (list_empty(&proc->locks))
5243 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5244 list_del_init(&lkb->lkb_ownqueue);
5246 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5247 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5249 lkb->lkb_flags |= DLM_IFL_DEAD;
5251 mutex_unlock(&ls->ls_clear_proc_locks);
5255 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5256 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5257 which we clear here. */
5259 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5260 list, and no more device_writes should add lkb's to proc->locks list; so we
5261 shouldn't need to take asts_spin or locks_spin here. this assumes that
5262 device reads/writes/closes are serialized -- FIXME: we may need to serialize
5265 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5267 struct dlm_lkb *lkb, *safe;
5269 dlm_lock_recovery(ls);
5272 lkb = del_proc_lock(ls, proc);
5276 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5277 orphan_proc_lock(ls, lkb);
5279 unlock_proc_lock(ls, lkb);
5281 /* this removes the reference for the proc->locks list
5282 added by dlm_user_request, it may result in the lkb
5288 mutex_lock(&ls->ls_clear_proc_locks);
5290 /* in-progress unlocks */
5291 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5292 list_del_init(&lkb->lkb_ownqueue);
5293 lkb->lkb_flags |= DLM_IFL_DEAD;
5297 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5298 memset(&lkb->lkb_callbacks, 0,
5299 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5300 list_del_init(&lkb->lkb_cb_list);
5304 mutex_unlock(&ls->ls_clear_proc_locks);
5305 dlm_unlock_recovery(ls);
5308 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5310 struct dlm_lkb *lkb, *safe;
5314 spin_lock(&proc->locks_spin);
5315 if (!list_empty(&proc->locks)) {
5316 lkb = list_entry(proc->locks.next, struct dlm_lkb,
5318 list_del_init(&lkb->lkb_ownqueue);
5320 spin_unlock(&proc->locks_spin);
5325 lkb->lkb_flags |= DLM_IFL_DEAD;
5326 unlock_proc_lock(ls, lkb);
5327 dlm_put_lkb(lkb); /* ref from proc->locks list */
5330 spin_lock(&proc->locks_spin);
5331 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5332 list_del_init(&lkb->lkb_ownqueue);
5333 lkb->lkb_flags |= DLM_IFL_DEAD;
5336 spin_unlock(&proc->locks_spin);
5338 spin_lock(&proc->asts_spin);
5339 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5340 memset(&lkb->lkb_callbacks, 0,
5341 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5342 list_del_init(&lkb->lkb_cb_list);
5345 spin_unlock(&proc->asts_spin);
5348 /* pid of 0 means purge all orphans */
5350 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5352 struct dlm_lkb *lkb, *safe;
5354 mutex_lock(&ls->ls_orphans_mutex);
5355 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5356 if (pid && lkb->lkb_ownpid != pid)
5358 unlock_proc_lock(ls, lkb);
5359 list_del_init(&lkb->lkb_ownqueue);
5362 mutex_unlock(&ls->ls_orphans_mutex);
5365 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5367 struct dlm_message *ms;
5368 struct dlm_mhandle *mh;
5371 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5372 DLM_MSG_PURGE, &ms, &mh);
5375 ms->m_nodeid = nodeid;
5378 return send_message(mh, ms);
5381 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5382 int nodeid, int pid)
5386 if (nodeid != dlm_our_nodeid()) {
5387 error = send_purge(ls, nodeid, pid);
5389 dlm_lock_recovery(ls);
5390 if (pid == current->pid)
5391 purge_proc_locks(ls, proc);
5393 do_purge(ls, nodeid, pid);
5394 dlm_unlock_recovery(ls);