1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
63 #include "requestqueue.h"
67 #include "lockspace.h"
72 #include "lvb_table.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
93 * Lock compatibilty matrix - thanks Steve
94 * UN = Unlocked state. Not really a state, used as a flag
95 * PD = Padding. Used to make the matrix a nice power of two in size
96 * Other states are the same as the VMS DLM.
97 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
100 static const int __dlm_compat_matrix[8][8] = {
101 /* UN NL CR CW PR PW EX PD */
102 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
104 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
105 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
106 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
107 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
108 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
109 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
113 * This defines the direction of transfer of LVB data.
114 * Granted mode is the row; requested mode is the column.
115 * Usage: matrix[grmode+1][rqmode+1]
116 * 1 = LVB is returned to the caller
117 * 0 = LVB is written to the resource
118 * -1 = nothing happens to the LVB
121 const int dlm_lvb_operations[8][8] = {
122 /* UN NL CR CW PR PW EX PD*/
123 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
124 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
125 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
126 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
127 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
128 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
129 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
130 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
133 #define modes_compat(gr, rq) \
134 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136 int dlm_modes_compat(int mode1, int mode2)
138 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 * Compatibility matrix for conversions with QUECVT set.
143 * Granted mode is the row; requested mode is the column.
144 * Usage: matrix[grmode+1][rqmode+1]
147 static const int __quecvt_compat_matrix[8][8] = {
148 /* UN NL CR CW PR PW EX PD */
149 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
150 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
151 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
152 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
153 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
154 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
155 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
156 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159 void dlm_print_lkb(struct dlm_lkb *lkb)
161 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
168 static void dlm_print_rsb(struct dlm_rsb *r)
170 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171 r->res_nodeid, r->res_flags, r->res_first_lkid,
172 r->res_recover_locks_count, r->res_name);
175 void dlm_dump_rsb(struct dlm_rsb *r)
181 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183 printk(KERN_ERR "rsb lookup list\n");
184 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 printk(KERN_ERR "rsb grant queue:\n");
187 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 printk(KERN_ERR "rsb convert queue:\n");
190 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 printk(KERN_ERR "rsb wait queue:\n");
193 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
197 /* Threads cannot use the lockspace while it's being recovered */
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 down_read(&ls->ls_in_recovery);
204 void dlm_unlock_recovery(struct dlm_ls *ls)
206 up_read(&ls->ls_in_recovery);
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 return down_read_trylock(&ls->ls_in_recovery);
214 static inline int can_be_queued(struct dlm_lkb *lkb)
216 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
224 static inline int is_demoted(struct dlm_lkb *lkb)
226 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
229 static inline int is_altmode(struct dlm_lkb *lkb)
231 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
234 static inline int is_granted(struct dlm_lkb *lkb)
236 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
239 static inline int is_remote(struct dlm_rsb *r)
241 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242 return !!r->res_nodeid;
245 static inline int is_process_copy(struct dlm_lkb *lkb)
247 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250 static inline int is_master_copy(struct dlm_lkb *lkb)
252 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257 static inline int middle_conversion(struct dlm_lkb *lkb)
259 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 static inline int down_conversion(struct dlm_lkb *lkb)
267 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280 static inline int is_overlap(struct dlm_lkb *lkb)
282 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283 DLM_IFL_OVERLAP_CANCEL));
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 if (is_master_copy(lkb))
293 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296 timeout caused the cancel then return -ETIMEDOUT */
297 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
302 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
307 lkb->lkb_lksb->sb_status = rv;
308 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
310 dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
316 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 lkb->lkb_time_bast = ktime_get();
323 if (is_master_copy(lkb)) {
324 lkb->lkb_bastmode = rqmode; /* printed by debugfs */
325 send_bast(r, lkb, rqmode);
327 dlm_add_ast(lkb, AST_BAST, rqmode);
332 * Basic operations on rsb's and lkb's
335 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
339 r = dlm_allocate_rsb(ls, len);
345 memcpy(r->res_name, name, len);
346 mutex_init(&r->res_mutex);
348 INIT_LIST_HEAD(&r->res_lookup);
349 INIT_LIST_HEAD(&r->res_grantqueue);
350 INIT_LIST_HEAD(&r->res_convertqueue);
351 INIT_LIST_HEAD(&r->res_waitqueue);
352 INIT_LIST_HEAD(&r->res_root_list);
353 INIT_LIST_HEAD(&r->res_recover_list);
358 static int search_rsb_list(struct list_head *head, char *name, int len,
359 unsigned int flags, struct dlm_rsb **r_ret)
364 list_for_each_entry(r, head, res_hashchain) {
365 if (len == r->res_length && !memcmp(name, r->res_name, len))
372 if (r->res_nodeid && (flags & R_MASTER))
378 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
379 unsigned int flags, struct dlm_rsb **r_ret)
384 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
386 kref_get(&r->res_ref);
389 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
393 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
395 if (dlm_no_directory(ls))
398 if (r->res_nodeid == -1) {
399 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
400 r->res_first_lkid = 0;
401 } else if (r->res_nodeid > 0) {
402 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
403 r->res_first_lkid = 0;
405 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
406 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
413 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
414 unsigned int flags, struct dlm_rsb **r_ret)
417 spin_lock(&ls->ls_rsbtbl[b].lock);
418 error = _search_rsb(ls, name, len, b, flags, r_ret);
419 spin_unlock(&ls->ls_rsbtbl[b].lock);
424 * Find rsb in rsbtbl and potentially create/add one
426 * Delaying the release of rsb's has a similar benefit to applications keeping
427 * NL locks on an rsb, but without the guarantee that the cached master value
428 * will still be valid when the rsb is reused. Apps aren't always smart enough
429 * to keep NL locks on an rsb that they may lock again shortly; this can lead
430 * to excessive master lookups and removals if we don't delay the release.
432 * Searching for an rsb means looking through both the normal list and toss
433 * list. When found on the toss list the rsb is moved to the normal list with
434 * ref count of 1; when found on normal list the ref count is incremented.
437 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
438 unsigned int flags, struct dlm_rsb **r_ret)
440 struct dlm_rsb *r = NULL, *tmp;
441 uint32_t hash, bucket;
444 if (namelen > DLM_RESNAME_MAXLEN)
447 if (dlm_no_directory(ls))
451 hash = jhash(name, namelen, 0);
452 bucket = hash & (ls->ls_rsbtbl_size - 1);
454 error = search_rsb(ls, name, namelen, bucket, flags, &r);
458 if (error == -EBADR && !(flags & R_CREATE))
461 /* the rsb was found but wasn't a master copy */
462 if (error == -ENOTBLK)
466 r = create_rsb(ls, name, namelen);
471 r->res_bucket = bucket;
473 kref_init(&r->res_ref);
475 /* With no directory, the master can be set immediately */
476 if (dlm_no_directory(ls)) {
477 int nodeid = dlm_dir_nodeid(r);
478 if (nodeid == dlm_our_nodeid())
480 r->res_nodeid = nodeid;
483 spin_lock(&ls->ls_rsbtbl[bucket].lock);
484 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
486 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
492 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
499 /* This is only called to add a reference when the code already holds
500 a valid reference to the rsb, so there's no need for locking. */
502 static inline void hold_rsb(struct dlm_rsb *r)
504 kref_get(&r->res_ref);
507 void dlm_hold_rsb(struct dlm_rsb *r)
512 static void toss_rsb(struct kref *kref)
514 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
515 struct dlm_ls *ls = r->res_ls;
517 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
518 kref_init(&r->res_ref);
519 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
520 r->res_toss_time = jiffies;
522 dlm_free_lvb(r->res_lvbptr);
523 r->res_lvbptr = NULL;
527 /* When all references to the rsb are gone it's transfered to
528 the tossed list for later disposal. */
530 static void put_rsb(struct dlm_rsb *r)
532 struct dlm_ls *ls = r->res_ls;
533 uint32_t bucket = r->res_bucket;
535 spin_lock(&ls->ls_rsbtbl[bucket].lock);
536 kref_put(&r->res_ref, toss_rsb);
537 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
540 void dlm_put_rsb(struct dlm_rsb *r)
545 /* See comment for unhold_lkb */
547 static void unhold_rsb(struct dlm_rsb *r)
550 rv = kref_put(&r->res_ref, toss_rsb);
551 DLM_ASSERT(!rv, dlm_dump_rsb(r););
554 static void kill_rsb(struct kref *kref)
556 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
558 /* All work is done after the return from kref_put() so we
559 can release the write_lock before the remove and free. */
561 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
562 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
563 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
564 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
565 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
566 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
569 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
570 The rsb must exist as long as any lkb's for it do. */
572 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
575 lkb->lkb_resource = r;
578 static void detach_lkb(struct dlm_lkb *lkb)
580 if (lkb->lkb_resource) {
581 put_rsb(lkb->lkb_resource);
582 lkb->lkb_resource = NULL;
586 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
588 struct dlm_lkb *lkb, *tmp;
592 lkb = dlm_allocate_lkb(ls);
596 lkb->lkb_nodeid = -1;
597 lkb->lkb_grmode = DLM_LOCK_IV;
598 kref_init(&lkb->lkb_ref);
599 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
600 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
601 INIT_LIST_HEAD(&lkb->lkb_time_list);
603 get_random_bytes(&bucket, sizeof(bucket));
604 bucket &= (ls->ls_lkbtbl_size - 1);
606 write_lock(&ls->ls_lkbtbl[bucket].lock);
608 /* counter can roll over so we must verify lkid is not in use */
611 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
613 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
615 if (tmp->lkb_id != lkid)
623 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
624 write_unlock(&ls->ls_lkbtbl[bucket].lock);
630 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
633 uint16_t bucket = (lkid >> 16);
635 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
636 if (lkb->lkb_id == lkid)
642 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
645 uint16_t bucket = (lkid >> 16);
647 if (bucket >= ls->ls_lkbtbl_size)
650 read_lock(&ls->ls_lkbtbl[bucket].lock);
651 lkb = __find_lkb(ls, lkid);
653 kref_get(&lkb->lkb_ref);
654 read_unlock(&ls->ls_lkbtbl[bucket].lock);
657 return lkb ? 0 : -ENOENT;
660 static void kill_lkb(struct kref *kref)
662 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
664 /* All work is done after the return from kref_put() so we
665 can release the write_lock before the detach_lkb */
667 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
670 /* __put_lkb() is used when an lkb may not have an rsb attached to
671 it so we need to provide the lockspace explicitly */
673 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
675 uint16_t bucket = (lkb->lkb_id >> 16);
677 write_lock(&ls->ls_lkbtbl[bucket].lock);
678 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
679 list_del(&lkb->lkb_idtbl_list);
680 write_unlock(&ls->ls_lkbtbl[bucket].lock);
684 /* for local/process lkbs, lvbptr points to caller's lksb */
685 if (lkb->lkb_lvbptr && is_master_copy(lkb))
686 dlm_free_lvb(lkb->lkb_lvbptr);
690 write_unlock(&ls->ls_lkbtbl[bucket].lock);
695 int dlm_put_lkb(struct dlm_lkb *lkb)
699 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
700 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
702 ls = lkb->lkb_resource->res_ls;
703 return __put_lkb(ls, lkb);
706 /* This is only called to add a reference when the code already holds
707 a valid reference to the lkb, so there's no need for locking. */
709 static inline void hold_lkb(struct dlm_lkb *lkb)
711 kref_get(&lkb->lkb_ref);
714 /* This is called when we need to remove a reference and are certain
715 it's not the last ref. e.g. del_lkb is always called between a
716 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
717 put_lkb would work fine, but would involve unnecessary locking */
719 static inline void unhold_lkb(struct dlm_lkb *lkb)
722 rv = kref_put(&lkb->lkb_ref, kill_lkb);
723 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
726 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
729 struct dlm_lkb *lkb = NULL;
731 list_for_each_entry(lkb, head, lkb_statequeue)
732 if (lkb->lkb_rqmode < mode)
736 list_add_tail(new, head);
738 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
741 /* add/remove lkb to rsb's grant/convert/wait queue */
743 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
745 kref_get(&lkb->lkb_ref);
747 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
749 lkb->lkb_timestamp = ktime_get();
751 lkb->lkb_status = status;
754 case DLM_LKSTS_WAITING:
755 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
756 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
758 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
760 case DLM_LKSTS_GRANTED:
761 /* convention says granted locks kept in order of grmode */
762 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
765 case DLM_LKSTS_CONVERT:
766 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
767 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
769 list_add_tail(&lkb->lkb_statequeue,
770 &r->res_convertqueue);
773 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
777 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
780 list_del(&lkb->lkb_statequeue);
784 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
788 add_lkb(r, lkb, sts);
792 static int msg_reply_type(int mstype)
795 case DLM_MSG_REQUEST:
796 return DLM_MSG_REQUEST_REPLY;
797 case DLM_MSG_CONVERT:
798 return DLM_MSG_CONVERT_REPLY;
800 return DLM_MSG_UNLOCK_REPLY;
802 return DLM_MSG_CANCEL_REPLY;
804 return DLM_MSG_LOOKUP_REPLY;
809 /* add/remove lkb from global waiters list of lkb's waiting for
810 a reply from a remote node */
812 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
814 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
817 mutex_lock(&ls->ls_waiters_mutex);
819 if (is_overlap_unlock(lkb) ||
820 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
825 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
828 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
831 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
837 lkb->lkb_wait_count++;
840 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
841 lkb->lkb_id, lkb->lkb_wait_type, mstype,
842 lkb->lkb_wait_count, lkb->lkb_flags);
846 DLM_ASSERT(!lkb->lkb_wait_count,
848 printk("wait_count %d\n", lkb->lkb_wait_count););
850 lkb->lkb_wait_count++;
851 lkb->lkb_wait_type = mstype;
853 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
856 log_error(ls, "addwait error %x %d flags %x %d %d %s",
857 lkb->lkb_id, error, lkb->lkb_flags, mstype,
858 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
859 mutex_unlock(&ls->ls_waiters_mutex);
863 /* We clear the RESEND flag because we might be taking an lkb off the waiters
864 list as part of process_requestqueue (e.g. a lookup that has an optimized
865 request reply on the requestqueue) between dlm_recover_waiters_pre() which
866 set RESEND and dlm_recover_waiters_post() */
868 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
869 struct dlm_message *ms)
871 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
872 int overlap_done = 0;
874 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
875 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
876 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
881 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
882 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
883 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
888 /* Cancel state was preemptively cleared by a successful convert,
889 see next comment, nothing to do. */
891 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
892 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
893 log_debug(ls, "remwait %x cancel_reply wait_type %d",
894 lkb->lkb_id, lkb->lkb_wait_type);
898 /* Remove for the convert reply, and premptively remove for the
899 cancel reply. A convert has been granted while there's still
900 an outstanding cancel on it (the cancel is moot and the result
901 in the cancel reply should be 0). We preempt the cancel reply
902 because the app gets the convert result and then can follow up
903 with another op, like convert. This subsequent op would see the
904 lingering state of the cancel and fail with -EBUSY. */
906 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
907 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
908 is_overlap_cancel(lkb) && ms && !ms->m_result) {
909 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
911 lkb->lkb_wait_type = 0;
912 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
913 lkb->lkb_wait_count--;
917 /* N.B. type of reply may not always correspond to type of original
918 msg due to lookup->request optimization, verify others? */
920 if (lkb->lkb_wait_type) {
921 lkb->lkb_wait_type = 0;
925 log_error(ls, "remwait error %x reply %d flags %x no wait_type",
926 lkb->lkb_id, mstype, lkb->lkb_flags);
930 /* the force-unlock/cancel has completed and we haven't recvd a reply
931 to the op that was in progress prior to the unlock/cancel; we
932 give up on any reply to the earlier op. FIXME: not sure when/how
935 if (overlap_done && lkb->lkb_wait_type) {
936 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
937 lkb->lkb_id, mstype, lkb->lkb_wait_type);
938 lkb->lkb_wait_count--;
939 lkb->lkb_wait_type = 0;
942 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
944 lkb->lkb_flags &= ~DLM_IFL_RESEND;
945 lkb->lkb_wait_count--;
946 if (!lkb->lkb_wait_count)
947 list_del_init(&lkb->lkb_wait_reply);
952 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
954 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
957 mutex_lock(&ls->ls_waiters_mutex);
958 error = _remove_from_waiters(lkb, mstype, NULL);
959 mutex_unlock(&ls->ls_waiters_mutex);
963 /* Handles situations where we might be processing a "fake" or "stub" reply in
964 which we can't try to take waiters_mutex again. */
966 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
968 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
971 if (ms != &ls->ls_stub_ms)
972 mutex_lock(&ls->ls_waiters_mutex);
973 error = _remove_from_waiters(lkb, ms->m_type, ms);
974 if (ms != &ls->ls_stub_ms)
975 mutex_unlock(&ls->ls_waiters_mutex);
979 static void dir_remove(struct dlm_rsb *r)
983 if (dlm_no_directory(r->res_ls))
986 to_nodeid = dlm_dir_nodeid(r);
987 if (to_nodeid != dlm_our_nodeid())
990 dlm_dir_remove_entry(r->res_ls, to_nodeid,
991 r->res_name, r->res_length);
994 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
995 found since they are in order of newest to oldest? */
997 static int shrink_bucket(struct dlm_ls *ls, int b)
1000 int count = 0, found;
1004 spin_lock(&ls->ls_rsbtbl[b].lock);
1005 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1007 if (!time_after_eq(jiffies, r->res_toss_time +
1008 dlm_config.ci_toss_secs * HZ))
1015 spin_unlock(&ls->ls_rsbtbl[b].lock);
1019 if (kref_put(&r->res_ref, kill_rsb)) {
1020 list_del(&r->res_hashchain);
1021 spin_unlock(&ls->ls_rsbtbl[b].lock);
1028 spin_unlock(&ls->ls_rsbtbl[b].lock);
1029 log_error(ls, "tossed rsb in use %s", r->res_name);
1036 void dlm_scan_rsbs(struct dlm_ls *ls)
1040 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1041 shrink_bucket(ls, i);
1042 if (dlm_locking_stopped(ls))
1048 static void add_timeout(struct dlm_lkb *lkb)
1050 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1052 if (is_master_copy(lkb))
1055 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1056 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1057 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1060 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1065 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1066 mutex_lock(&ls->ls_timeout_mutex);
1068 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1069 mutex_unlock(&ls->ls_timeout_mutex);
1072 static void del_timeout(struct dlm_lkb *lkb)
1074 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1076 mutex_lock(&ls->ls_timeout_mutex);
1077 if (!list_empty(&lkb->lkb_time_list)) {
1078 list_del_init(&lkb->lkb_time_list);
1081 mutex_unlock(&ls->ls_timeout_mutex);
1084 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1085 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1086 and then lock rsb because of lock ordering in add_timeout. We may need
1087 to specify some special timeout-related bits in the lkb that are just to
1088 be accessed under the timeout_mutex. */
1090 void dlm_scan_timeout(struct dlm_ls *ls)
1093 struct dlm_lkb *lkb;
1094 int do_cancel, do_warn;
1098 if (dlm_locking_stopped(ls))
1103 mutex_lock(&ls->ls_timeout_mutex);
1104 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1106 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1107 lkb->lkb_timestamp));
1109 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1110 wait_us >= (lkb->lkb_timeout_cs * 10000))
1113 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1114 wait_us >= dlm_config.ci_timewarn_cs * 10000)
1117 if (!do_cancel && !do_warn)
1122 mutex_unlock(&ls->ls_timeout_mutex);
1124 if (!do_cancel && !do_warn)
1127 r = lkb->lkb_resource;
1132 /* clear flag so we only warn once */
1133 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1134 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1136 dlm_timeout_warn(lkb);
1140 log_debug(ls, "timeout cancel %x node %d %s",
1141 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1142 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1143 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1145 _cancel_lock(r, lkb);
1154 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1155 dlm_recoverd before checking/setting ls_recover_begin. */
1157 void dlm_adjust_timeouts(struct dlm_ls *ls)
1159 struct dlm_lkb *lkb;
1160 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1162 ls->ls_recover_begin = 0;
1163 mutex_lock(&ls->ls_timeout_mutex);
1164 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1165 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1166 mutex_unlock(&ls->ls_timeout_mutex);
1169 /* lkb is master or local copy */
1171 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1173 int b, len = r->res_ls->ls_lvblen;
1175 /* b=1 lvb returned to caller
1176 b=0 lvb written to rsb or invalidated
1179 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1182 if (!lkb->lkb_lvbptr)
1185 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1191 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1192 lkb->lkb_lvbseq = r->res_lvbseq;
1194 } else if (b == 0) {
1195 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1196 rsb_set_flag(r, RSB_VALNOTVALID);
1200 if (!lkb->lkb_lvbptr)
1203 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1207 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1212 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1214 lkb->lkb_lvbseq = r->res_lvbseq;
1215 rsb_clear_flag(r, RSB_VALNOTVALID);
1218 if (rsb_flag(r, RSB_VALNOTVALID))
1219 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1222 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1224 if (lkb->lkb_grmode < DLM_LOCK_PW)
1227 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1228 rsb_set_flag(r, RSB_VALNOTVALID);
1232 if (!lkb->lkb_lvbptr)
1235 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1239 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1244 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1246 rsb_clear_flag(r, RSB_VALNOTVALID);
1249 /* lkb is process copy (pc) */
1251 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1252 struct dlm_message *ms)
1256 if (!lkb->lkb_lvbptr)
1259 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1262 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1264 int len = receive_extralen(ms);
1265 if (len > DLM_RESNAME_MAXLEN)
1266 len = DLM_RESNAME_MAXLEN;
1267 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1268 lkb->lkb_lvbseq = ms->m_lvbseq;
1272 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1273 remove_lock -- used for unlock, removes lkb from granted
1274 revert_lock -- used for cancel, moves lkb from convert to granted
1275 grant_lock -- used for request and convert, adds lkb to granted or
1276 moves lkb from convert or waiting to granted
1278 Each of these is used for master or local copy lkb's. There is
1279 also a _pc() variation used to make the corresponding change on
1280 a process copy (pc) lkb. */
1282 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1285 lkb->lkb_grmode = DLM_LOCK_IV;
1286 /* this unhold undoes the original ref from create_lkb()
1287 so this leads to the lkb being freed */
1291 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1293 set_lvb_unlock(r, lkb);
1294 _remove_lock(r, lkb);
1297 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299 _remove_lock(r, lkb);
1302 /* returns: 0 did nothing
1303 1 moved lock to granted
1306 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1310 lkb->lkb_rqmode = DLM_LOCK_IV;
1312 switch (lkb->lkb_status) {
1313 case DLM_LKSTS_GRANTED:
1315 case DLM_LKSTS_CONVERT:
1316 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1319 case DLM_LKSTS_WAITING:
1321 lkb->lkb_grmode = DLM_LOCK_IV;
1322 /* this unhold undoes the original ref from create_lkb()
1323 so this leads to the lkb being freed */
1328 log_print("invalid status for revert %d", lkb->lkb_status);
1333 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1335 return revert_lock(r, lkb);
1338 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1340 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1341 lkb->lkb_grmode = lkb->lkb_rqmode;
1342 if (lkb->lkb_status)
1343 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1345 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1348 lkb->lkb_rqmode = DLM_LOCK_IV;
1351 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353 set_lvb_lock(r, lkb);
1354 _grant_lock(r, lkb);
1355 lkb->lkb_highbast = 0;
1358 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1359 struct dlm_message *ms)
1361 set_lvb_lock_pc(r, lkb, ms);
1362 _grant_lock(r, lkb);
1365 /* called by grant_pending_locks() which means an async grant message must
1366 be sent to the requesting node in addition to granting the lock if the
1367 lkb belongs to a remote node. */
1369 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372 if (is_master_copy(lkb))
1375 queue_cast(r, lkb, 0);
1378 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1379 change the granted/requested modes. We're munging things accordingly in
1381 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1383 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1384 compatible with other granted locks */
1386 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1388 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1389 log_print("munge_demoted %x invalid reply type %d",
1390 lkb->lkb_id, ms->m_type);
1394 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1395 log_print("munge_demoted %x invalid modes gr %d rq %d",
1396 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1400 lkb->lkb_grmode = DLM_LOCK_NL;
1403 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1405 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1406 ms->m_type != DLM_MSG_GRANT) {
1407 log_print("munge_altmode %x invalid reply type %d",
1408 lkb->lkb_id, ms->m_type);
1412 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1413 lkb->lkb_rqmode = DLM_LOCK_PR;
1414 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1415 lkb->lkb_rqmode = DLM_LOCK_CW;
1417 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1422 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1424 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1426 if (lkb->lkb_id == first->lkb_id)
1432 /* Check if the given lkb conflicts with another lkb on the queue. */
1434 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1436 struct dlm_lkb *this;
1438 list_for_each_entry(this, head, lkb_statequeue) {
1441 if (!modes_compat(this, lkb))
1448 * "A conversion deadlock arises with a pair of lock requests in the converting
1449 * queue for one resource. The granted mode of each lock blocks the requested
1450 * mode of the other lock."
1452 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1453 * convert queue from being granted, then deadlk/demote lkb.
1456 * Granted Queue: empty
1457 * Convert Queue: NL->EX (first lock)
1458 * PR->EX (second lock)
1460 * The first lock can't be granted because of the granted mode of the second
1461 * lock and the second lock can't be granted because it's not first in the
1462 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1463 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1464 * flag set and return DEMOTED in the lksb flags.
1466 * Originally, this function detected conv-deadlk in a more limited scope:
1467 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1468 * - if lkb1 was the first entry in the queue (not just earlier), and was
1469 * blocked by the granted mode of lkb2, and there was nothing on the
1470 * granted queue preventing lkb1 from being granted immediately, i.e.
1471 * lkb2 was the only thing preventing lkb1 from being granted.
1473 * That second condition meant we'd only say there was conv-deadlk if
1474 * resolving it (by demotion) would lead to the first lock on the convert
1475 * queue being granted right away. It allowed conversion deadlocks to exist
1476 * between locks on the convert queue while they couldn't be granted anyway.
1478 * Now, we detect and take action on conversion deadlocks immediately when
1479 * they're created, even if they may not be immediately consequential. If
1480 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1481 * mode that would prevent lkb1's conversion from being granted, we do a
1482 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1483 * I think this means that the lkb_is_ahead condition below should always
1484 * be zero, i.e. there will never be conv-deadlk between two locks that are
1485 * both already on the convert queue.
1488 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1490 struct dlm_lkb *lkb1;
1491 int lkb_is_ahead = 0;
1493 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1499 if (!lkb_is_ahead) {
1500 if (!modes_compat(lkb2, lkb1))
1503 if (!modes_compat(lkb2, lkb1) &&
1504 !modes_compat(lkb1, lkb2))
1512 * Return 1 if the lock can be granted, 0 otherwise.
1513 * Also detect and resolve conversion deadlocks.
1515 * lkb is the lock to be granted
1517 * now is 1 if the function is being called in the context of the
1518 * immediate request, it is 0 if called later, after the lock has been
1521 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1524 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1526 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1529 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1530 * a new request for a NL mode lock being blocked.
1532 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1533 * request, then it would be granted. In essence, the use of this flag
1534 * tells the Lock Manager to expedite theis request by not considering
1535 * what may be in the CONVERTING or WAITING queues... As of this
1536 * writing, the EXPEDITE flag can be used only with new requests for NL
1537 * mode locks. This flag is not valid for conversion requests.
1539 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1540 * conversion or used with a non-NL requested mode. We also know an
1541 * EXPEDITE request is always granted immediately, so now must always
1542 * be 1. The full condition to grant an expedite request: (now &&
1543 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1544 * therefore be shortened to just checking the flag.
1547 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1551 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1552 * added to the remaining conditions.
1555 if (queue_conflict(&r->res_grantqueue, lkb))
1559 * 6-3: By default, a conversion request is immediately granted if the
1560 * requested mode is compatible with the modes of all other granted
1564 if (queue_conflict(&r->res_convertqueue, lkb))
1568 * 6-5: But the default algorithm for deciding whether to grant or
1569 * queue conversion requests does not by itself guarantee that such
1570 * requests are serviced on a "first come first serve" basis. This, in
1571 * turn, can lead to a phenomenon known as "indefinate postponement".
1573 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1574 * the system service employed to request a lock conversion. This flag
1575 * forces certain conversion requests to be queued, even if they are
1576 * compatible with the granted modes of other locks on the same
1577 * resource. Thus, the use of this flag results in conversion requests
1578 * being ordered on a "first come first servce" basis.
1580 * DCT: This condition is all about new conversions being able to occur
1581 * "in place" while the lock remains on the granted queue (assuming
1582 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1583 * doesn't _have_ to go onto the convert queue where it's processed in
1584 * order. The "now" variable is necessary to distinguish converts
1585 * being received and processed for the first time now, because once a
1586 * convert is moved to the conversion queue the condition below applies
1587 * requiring fifo granting.
1590 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1594 * The NOORDER flag is set to avoid the standard vms rules on grant
1598 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1602 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1603 * granted until all other conversion requests ahead of it are granted
1607 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1611 * 6-4: By default, a new request is immediately granted only if all
1612 * three of the following conditions are satisfied when the request is
1614 * - The queue of ungranted conversion requests for the resource is
1616 * - The queue of ungranted new requests for the resource is empty.
1617 * - The mode of the new request is compatible with the most
1618 * restrictive mode of all granted locks on the resource.
1621 if (now && !conv && list_empty(&r->res_convertqueue) &&
1622 list_empty(&r->res_waitqueue))
1626 * 6-4: Once a lock request is in the queue of ungranted new requests,
1627 * it cannot be granted until the queue of ungranted conversion
1628 * requests is empty, all ungranted new requests ahead of it are
1629 * granted and/or canceled, and it is compatible with the granted mode
1630 * of the most restrictive lock granted on the resource.
1633 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1634 first_in_list(lkb, &r->res_waitqueue))
1640 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1644 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1645 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1650 rv = _can_be_granted(r, lkb, now);
1655 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1656 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1657 * cancels one of the locks.
1660 if (is_convert && can_be_queued(lkb) &&
1661 conversion_deadlock_detect(r, lkb)) {
1662 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1663 lkb->lkb_grmode = DLM_LOCK_NL;
1664 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1665 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1669 log_print("can_be_granted deadlock %x now %d",
1678 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1679 * to grant a request in a mode other than the normal rqmode. It's a
1680 * simple way to provide a big optimization to applications that can
1684 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1686 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1690 lkb->lkb_rqmode = alt;
1691 rv = _can_be_granted(r, lkb, now);
1693 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1695 lkb->lkb_rqmode = rqmode;
1701 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1702 for locks pending on the convert list. Once verified (watch for these
1703 log_prints), we should be able to just call _can_be_granted() and not
1704 bother with the demote/deadlk cases here (and there's no easy way to deal
1705 with a deadlk here, we'd have to generate something like grant_lock with
1706 the deadlk error.) */
1708 /* Returns the highest requested mode of all blocked conversions; sets
1709 cw if there's a blocked conversion to DLM_LOCK_CW. */
1711 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1713 struct dlm_lkb *lkb, *s;
1714 int hi, demoted, quit, grant_restart, demote_restart;
1723 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1724 demoted = is_demoted(lkb);
1727 if (can_be_granted(r, lkb, 0, &deadlk)) {
1728 grant_lock_pending(r, lkb);
1733 if (!demoted && is_demoted(lkb)) {
1734 log_print("WARN: pending demoted %x node %d %s",
1735 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741 log_print("WARN: pending deadlock %x node %d %s",
1742 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1747 hi = max_t(int, lkb->lkb_rqmode, hi);
1749 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1755 if (demote_restart && !quit) {
1760 return max_t(int, high, hi);
1763 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1765 struct dlm_lkb *lkb, *s;
1767 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1768 if (can_be_granted(r, lkb, 0, NULL))
1769 grant_lock_pending(r, lkb);
1771 high = max_t(int, lkb->lkb_rqmode, high);
1772 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1780 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1781 on either the convert or waiting queue.
1782 high is the largest rqmode of all locks blocked on the convert or
1785 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1787 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1788 if (gr->lkb_highbast < DLM_LOCK_EX)
1793 if (gr->lkb_highbast < high &&
1794 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1799 static void grant_pending_locks(struct dlm_rsb *r)
1801 struct dlm_lkb *lkb, *s;
1802 int high = DLM_LOCK_IV;
1805 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1807 high = grant_pending_convert(r, high, &cw);
1808 high = grant_pending_wait(r, high, &cw);
1810 if (high == DLM_LOCK_IV)
1814 * If there are locks left on the wait/convert queue then send blocking
1815 * ASTs to granted locks based on the largest requested mode (high)
1819 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1820 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1821 if (cw && high == DLM_LOCK_PR &&
1822 lkb->lkb_grmode == DLM_LOCK_PR)
1823 queue_bast(r, lkb, DLM_LOCK_CW);
1825 queue_bast(r, lkb, high);
1826 lkb->lkb_highbast = high;
1831 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1833 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1834 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1835 if (gr->lkb_highbast < DLM_LOCK_EX)
1840 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1845 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1846 struct dlm_lkb *lkb)
1850 list_for_each_entry(gr, head, lkb_statequeue) {
1851 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1852 queue_bast(r, gr, lkb->lkb_rqmode);
1853 gr->lkb_highbast = lkb->lkb_rqmode;
1858 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1860 send_bast_queue(r, &r->res_grantqueue, lkb);
1863 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1865 send_bast_queue(r, &r->res_grantqueue, lkb);
1866 send_bast_queue(r, &r->res_convertqueue, lkb);
1869 /* set_master(r, lkb) -- set the master nodeid of a resource
1871 The purpose of this function is to set the nodeid field in the given
1872 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1873 known, it can just be copied to the lkb and the function will return
1874 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1875 before it can be copied to the lkb.
1877 When the rsb nodeid is being looked up remotely, the initial lkb
1878 causing the lookup is kept on the ls_waiters list waiting for the
1879 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1880 on the rsb's res_lookup list until the master is verified.
1883 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1884 1: the rsb master is not available and the lkb has been placed on
1888 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1890 struct dlm_ls *ls = r->res_ls;
1891 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1893 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1894 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1895 r->res_first_lkid = lkb->lkb_id;
1896 lkb->lkb_nodeid = r->res_nodeid;
1900 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1901 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1905 if (r->res_nodeid == 0) {
1906 lkb->lkb_nodeid = 0;
1910 if (r->res_nodeid > 0) {
1911 lkb->lkb_nodeid = r->res_nodeid;
1915 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1917 dir_nodeid = dlm_dir_nodeid(r);
1919 if (dir_nodeid != our_nodeid) {
1920 r->res_first_lkid = lkb->lkb_id;
1921 send_lookup(r, lkb);
1925 for (i = 0; i < 2; i++) {
1926 /* It's possible for dlm_scand to remove an old rsb for
1927 this same resource from the toss list, us to create
1928 a new one, look up the master locally, and find it
1929 already exists just before dlm_scand does the
1930 dir_remove() on the previous rsb. */
1932 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1933 r->res_length, &ret_nodeid);
1936 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1939 if (error && error != -EEXIST)
1942 if (ret_nodeid == our_nodeid) {
1943 r->res_first_lkid = 0;
1945 lkb->lkb_nodeid = 0;
1947 r->res_first_lkid = lkb->lkb_id;
1948 r->res_nodeid = ret_nodeid;
1949 lkb->lkb_nodeid = ret_nodeid;
1954 static void process_lookup_list(struct dlm_rsb *r)
1956 struct dlm_lkb *lkb, *safe;
1958 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1959 list_del_init(&lkb->lkb_rsb_lookup);
1960 _request_lock(r, lkb);
1965 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1967 static void confirm_master(struct dlm_rsb *r, int error)
1969 struct dlm_lkb *lkb;
1971 if (!r->res_first_lkid)
1977 r->res_first_lkid = 0;
1978 process_lookup_list(r);
1984 /* the remote request failed and won't be retried (it was
1985 a NOQUEUE, or has been canceled/unlocked); make a waiting
1986 lkb the first_lkid */
1988 r->res_first_lkid = 0;
1990 if (!list_empty(&r->res_lookup)) {
1991 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1993 list_del_init(&lkb->lkb_rsb_lookup);
1994 r->res_first_lkid = lkb->lkb_id;
1995 _request_lock(r, lkb);
2000 log_error(r->res_ls, "confirm_master unknown error %d", error);
2004 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2005 int namelen, unsigned long timeout_cs,
2006 void (*ast) (void *astparam),
2008 void (*bast) (void *astparam, int mode),
2009 struct dlm_args *args)
2013 /* check for invalid arg usage */
2015 if (mode < 0 || mode > DLM_LOCK_EX)
2018 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2021 if (flags & DLM_LKF_CANCEL)
2024 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2027 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2030 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2033 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2036 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2039 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2042 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2048 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2051 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2054 /* these args will be copied to the lkb in validate_lock_args,
2055 it cannot be done now because when converting locks, fields in
2056 an active lkb cannot be modified before locking the rsb */
2058 args->flags = flags;
2060 args->astparam = astparam;
2061 args->bastfn = bast;
2062 args->timeout = timeout_cs;
2070 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2072 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2073 DLM_LKF_FORCEUNLOCK))
2076 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2079 args->flags = flags;
2080 args->astparam = astarg;
2084 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2085 struct dlm_args *args)
2089 if (args->flags & DLM_LKF_CONVERT) {
2090 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2093 if (args->flags & DLM_LKF_QUECVT &&
2094 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2098 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2101 if (lkb->lkb_wait_type)
2104 if (is_overlap(lkb))
2108 lkb->lkb_exflags = args->flags;
2109 lkb->lkb_sbflags = 0;
2110 lkb->lkb_astfn = args->astfn;
2111 lkb->lkb_astparam = args->astparam;
2112 lkb->lkb_bastfn = args->bastfn;
2113 lkb->lkb_rqmode = args->mode;
2114 lkb->lkb_lksb = args->lksb;
2115 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2116 lkb->lkb_ownpid = (int) current->pid;
2117 lkb->lkb_timeout_cs = args->timeout;
2121 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2122 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2123 lkb->lkb_status, lkb->lkb_wait_type,
2124 lkb->lkb_resource->res_name);
2128 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2131 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2132 because there may be a lookup in progress and it's valid to do
2133 cancel/unlockf on it */
2135 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2137 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2140 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2141 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2146 /* an lkb may still exist even though the lock is EOL'ed due to a
2147 cancel, unlock or failed noqueue request; an app can't use these
2148 locks; return same error as if the lkid had not been found at all */
2150 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2151 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2156 /* an lkb may be waiting for an rsb lookup to complete where the
2157 lookup was initiated by another lock */
2159 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2160 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2161 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2162 list_del_init(&lkb->lkb_rsb_lookup);
2163 queue_cast(lkb->lkb_resource, lkb,
2164 args->flags & DLM_LKF_CANCEL ?
2165 -DLM_ECANCEL : -DLM_EUNLOCK);
2166 unhold_lkb(lkb); /* undoes create_lkb() */
2168 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2173 /* cancel not allowed with another cancel/unlock in progress */
2175 if (args->flags & DLM_LKF_CANCEL) {
2176 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2179 if (is_overlap(lkb))
2182 /* don't let scand try to do a cancel */
2185 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2186 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2191 /* there's nothing to cancel */
2192 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2193 !lkb->lkb_wait_type) {
2198 switch (lkb->lkb_wait_type) {
2199 case DLM_MSG_LOOKUP:
2200 case DLM_MSG_REQUEST:
2201 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2204 case DLM_MSG_UNLOCK:
2205 case DLM_MSG_CANCEL:
2208 /* add_to_waiters() will set OVERLAP_CANCEL */
2212 /* do we need to allow a force-unlock if there's a normal unlock
2213 already in progress? in what conditions could the normal unlock
2214 fail such that we'd want to send a force-unlock to be sure? */
2216 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2217 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2220 if (is_overlap_unlock(lkb))
2223 /* don't let scand try to do a cancel */
2226 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2227 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2232 switch (lkb->lkb_wait_type) {
2233 case DLM_MSG_LOOKUP:
2234 case DLM_MSG_REQUEST:
2235 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2238 case DLM_MSG_UNLOCK:
2241 /* add_to_waiters() will set OVERLAP_UNLOCK */
2245 /* normal unlock not allowed if there's any op in progress */
2247 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2251 /* an overlapping op shouldn't blow away exflags from other op */
2252 lkb->lkb_exflags |= args->flags;
2253 lkb->lkb_sbflags = 0;
2254 lkb->lkb_astparam = args->astparam;
2258 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2259 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2260 args->flags, lkb->lkb_wait_type,
2261 lkb->lkb_resource->res_name);
2266 * Four stage 4 varieties:
2267 * do_request(), do_convert(), do_unlock(), do_cancel()
2268 * These are called on the master node for the given lock and
2269 * from the central locking logic.
2272 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2276 if (can_be_granted(r, lkb, 1, NULL)) {
2278 queue_cast(r, lkb, 0);
2282 if (can_be_queued(lkb)) {
2283 error = -EINPROGRESS;
2284 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2290 queue_cast(r, lkb, -EAGAIN);
2295 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2300 if (force_blocking_asts(lkb))
2301 send_blocking_asts_all(r, lkb);
2304 send_blocking_asts(r, lkb);
2309 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2314 /* changing an existing lock may allow others to be granted */
2316 if (can_be_granted(r, lkb, 1, &deadlk)) {
2318 queue_cast(r, lkb, 0);
2322 /* can_be_granted() detected that this lock would block in a conversion
2323 deadlock, so we leave it on the granted queue and return EDEADLK in
2324 the ast for the convert. */
2327 /* it's left on the granted queue */
2328 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2329 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2330 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2331 revert_lock(r, lkb);
2332 queue_cast(r, lkb, -EDEADLK);
2337 /* is_demoted() means the can_be_granted() above set the grmode
2338 to NL, and left us on the granted queue. This auto-demotion
2339 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2340 now grantable. We have to try to grant other converting locks
2341 before we try again to grant this one. */
2343 if (is_demoted(lkb)) {
2344 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2345 if (_can_be_granted(r, lkb, 1)) {
2347 queue_cast(r, lkb, 0);
2350 /* else fall through and move to convert queue */
2353 if (can_be_queued(lkb)) {
2354 error = -EINPROGRESS;
2356 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2362 queue_cast(r, lkb, -EAGAIN);
2367 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2372 grant_pending_locks(r);
2373 /* grant_pending_locks also sends basts */
2376 if (force_blocking_asts(lkb))
2377 send_blocking_asts_all(r, lkb);
2380 send_blocking_asts(r, lkb);
2385 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2387 remove_lock(r, lkb);
2388 queue_cast(r, lkb, -DLM_EUNLOCK);
2389 return -DLM_EUNLOCK;
2392 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2395 grant_pending_locks(r);
2398 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2400 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2404 error = revert_lock(r, lkb);
2406 queue_cast(r, lkb, -DLM_ECANCEL);
2407 return -DLM_ECANCEL;
2412 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2416 grant_pending_locks(r);
2420 * Four stage 3 varieties:
2421 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2424 /* add a new lkb to a possibly new rsb, called by requesting process */
2426 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2430 /* set_master: sets lkb nodeid from r */
2432 error = set_master(r, lkb);
2441 /* receive_request() calls do_request() on remote node */
2442 error = send_request(r, lkb);
2444 error = do_request(r, lkb);
2445 /* for remote locks the request_reply is sent
2446 between do_request and do_request_effects */
2447 do_request_effects(r, lkb, error);
2453 /* change some property of an existing lkb, e.g. mode */
2455 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2460 /* receive_convert() calls do_convert() on remote node */
2461 error = send_convert(r, lkb);
2463 error = do_convert(r, lkb);
2464 /* for remote locks the convert_reply is sent
2465 between do_convert and do_convert_effects */
2466 do_convert_effects(r, lkb, error);
2472 /* remove an existing lkb from the granted queue */
2474 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2479 /* receive_unlock() calls do_unlock() on remote node */
2480 error = send_unlock(r, lkb);
2482 error = do_unlock(r, lkb);
2483 /* for remote locks the unlock_reply is sent
2484 between do_unlock and do_unlock_effects */
2485 do_unlock_effects(r, lkb, error);
2491 /* remove an existing lkb from the convert or wait queue */
2493 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2498 /* receive_cancel() calls do_cancel() on remote node */
2499 error = send_cancel(r, lkb);
2501 error = do_cancel(r, lkb);
2502 /* for remote locks the cancel_reply is sent
2503 between do_cancel and do_cancel_effects */
2504 do_cancel_effects(r, lkb, error);
2511 * Four stage 2 varieties:
2512 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2515 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2516 int len, struct dlm_args *args)
2521 error = validate_lock_args(ls, lkb, args);
2525 error = find_rsb(ls, name, len, R_CREATE, &r);
2532 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2534 error = _request_lock(r, lkb);
2543 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2544 struct dlm_args *args)
2549 r = lkb->lkb_resource;
2554 error = validate_lock_args(ls, lkb, args);
2558 error = _convert_lock(r, lkb);
2565 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2566 struct dlm_args *args)
2571 r = lkb->lkb_resource;
2576 error = validate_unlock_args(lkb, args);
2580 error = _unlock_lock(r, lkb);
2587 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2588 struct dlm_args *args)
2593 r = lkb->lkb_resource;
2598 error = validate_unlock_args(lkb, args);
2602 error = _cancel_lock(r, lkb);
2610 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2613 int dlm_lock(dlm_lockspace_t *lockspace,
2615 struct dlm_lksb *lksb,
2618 unsigned int namelen,
2619 uint32_t parent_lkid,
2620 void (*ast) (void *astarg),
2622 void (*bast) (void *astarg, int mode))
2625 struct dlm_lkb *lkb;
2626 struct dlm_args args;
2627 int error, convert = flags & DLM_LKF_CONVERT;
2629 ls = dlm_find_lockspace_local(lockspace);
2633 dlm_lock_recovery(ls);
2636 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2638 error = create_lkb(ls, &lkb);
2643 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2644 astarg, bast, &args);
2649 error = convert_lock(ls, lkb, &args);
2651 error = request_lock(ls, lkb, name, namelen, &args);
2653 if (error == -EINPROGRESS)
2656 if (convert || error)
2658 if (error == -EAGAIN || error == -EDEADLK)
2661 dlm_unlock_recovery(ls);
2662 dlm_put_lockspace(ls);
2666 int dlm_unlock(dlm_lockspace_t *lockspace,
2669 struct dlm_lksb *lksb,
2673 struct dlm_lkb *lkb;
2674 struct dlm_args args;
2677 ls = dlm_find_lockspace_local(lockspace);
2681 dlm_lock_recovery(ls);
2683 error = find_lkb(ls, lkid, &lkb);
2687 error = set_unlock_args(flags, astarg, &args);
2691 if (flags & DLM_LKF_CANCEL)
2692 error = cancel_lock(ls, lkb, &args);
2694 error = unlock_lock(ls, lkb, &args);
2696 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2698 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2703 dlm_unlock_recovery(ls);
2704 dlm_put_lockspace(ls);
2709 * send/receive routines for remote operations and replies
2713 * send_request receive_request
2714 * send_convert receive_convert
2715 * send_unlock receive_unlock
2716 * send_cancel receive_cancel
2717 * send_grant receive_grant
2718 * send_bast receive_bast
2719 * send_lookup receive_lookup
2720 * send_remove receive_remove
2723 * receive_request_reply send_request_reply
2724 * receive_convert_reply send_convert_reply
2725 * receive_unlock_reply send_unlock_reply
2726 * receive_cancel_reply send_cancel_reply
2727 * receive_lookup_reply send_lookup_reply
2730 static int _create_message(struct dlm_ls *ls, int mb_len,
2731 int to_nodeid, int mstype,
2732 struct dlm_message **ms_ret,
2733 struct dlm_mhandle **mh_ret)
2735 struct dlm_message *ms;
2736 struct dlm_mhandle *mh;
2739 /* get_buffer gives us a message handle (mh) that we need to
2740 pass into lowcomms_commit and a message buffer (mb) that we
2741 write our data into */
2743 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2747 memset(mb, 0, mb_len);
2749 ms = (struct dlm_message *) mb;
2751 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2752 ms->m_header.h_lockspace = ls->ls_global_id;
2753 ms->m_header.h_nodeid = dlm_our_nodeid();
2754 ms->m_header.h_length = mb_len;
2755 ms->m_header.h_cmd = DLM_MSG;
2757 ms->m_type = mstype;
2764 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2765 int to_nodeid, int mstype,
2766 struct dlm_message **ms_ret,
2767 struct dlm_mhandle **mh_ret)
2769 int mb_len = sizeof(struct dlm_message);
2772 case DLM_MSG_REQUEST:
2773 case DLM_MSG_LOOKUP:
2774 case DLM_MSG_REMOVE:
2775 mb_len += r->res_length;
2777 case DLM_MSG_CONVERT:
2778 case DLM_MSG_UNLOCK:
2779 case DLM_MSG_REQUEST_REPLY:
2780 case DLM_MSG_CONVERT_REPLY:
2782 if (lkb && lkb->lkb_lvbptr)
2783 mb_len += r->res_ls->ls_lvblen;
2787 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2791 /* further lowcomms enhancements or alternate implementations may make
2792 the return value from this function useful at some point */
2794 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2796 dlm_message_out(ms);
2797 dlm_lowcomms_commit_buffer(mh);
2801 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2802 struct dlm_message *ms)
2804 ms->m_nodeid = lkb->lkb_nodeid;
2805 ms->m_pid = lkb->lkb_ownpid;
2806 ms->m_lkid = lkb->lkb_id;
2807 ms->m_remid = lkb->lkb_remid;
2808 ms->m_exflags = lkb->lkb_exflags;
2809 ms->m_sbflags = lkb->lkb_sbflags;
2810 ms->m_flags = lkb->lkb_flags;
2811 ms->m_lvbseq = lkb->lkb_lvbseq;
2812 ms->m_status = lkb->lkb_status;
2813 ms->m_grmode = lkb->lkb_grmode;
2814 ms->m_rqmode = lkb->lkb_rqmode;
2815 ms->m_hash = r->res_hash;
2817 /* m_result and m_bastmode are set from function args,
2818 not from lkb fields */
2820 if (lkb->lkb_bastfn)
2821 ms->m_asts |= AST_BAST;
2823 ms->m_asts |= AST_COMP;
2825 /* compare with switch in create_message; send_remove() doesn't
2828 switch (ms->m_type) {
2829 case DLM_MSG_REQUEST:
2830 case DLM_MSG_LOOKUP:
2831 memcpy(ms->m_extra, r->res_name, r->res_length);
2833 case DLM_MSG_CONVERT:
2834 case DLM_MSG_UNLOCK:
2835 case DLM_MSG_REQUEST_REPLY:
2836 case DLM_MSG_CONVERT_REPLY:
2838 if (!lkb->lkb_lvbptr)
2840 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2845 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2847 struct dlm_message *ms;
2848 struct dlm_mhandle *mh;
2849 int to_nodeid, error;
2851 error = add_to_waiters(lkb, mstype);
2855 to_nodeid = r->res_nodeid;
2857 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2861 send_args(r, lkb, ms);
2863 error = send_message(mh, ms);
2869 remove_from_waiters(lkb, msg_reply_type(mstype));
2873 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2875 return send_common(r, lkb, DLM_MSG_REQUEST);
2878 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2882 error = send_common(r, lkb, DLM_MSG_CONVERT);
2884 /* down conversions go without a reply from the master */
2885 if (!error && down_conversion(lkb)) {
2886 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2887 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2888 r->res_ls->ls_stub_ms.m_result = 0;
2889 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2890 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2896 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2897 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2898 that the master is still correct. */
2900 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902 return send_common(r, lkb, DLM_MSG_UNLOCK);
2905 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2907 return send_common(r, lkb, DLM_MSG_CANCEL);
2910 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2912 struct dlm_message *ms;
2913 struct dlm_mhandle *mh;
2914 int to_nodeid, error;
2916 to_nodeid = lkb->lkb_nodeid;
2918 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2922 send_args(r, lkb, ms);
2926 error = send_message(mh, ms);
2931 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2933 struct dlm_message *ms;
2934 struct dlm_mhandle *mh;
2935 int to_nodeid, error;
2937 to_nodeid = lkb->lkb_nodeid;
2939 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2943 send_args(r, lkb, ms);
2945 ms->m_bastmode = mode;
2947 error = send_message(mh, ms);
2952 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2954 struct dlm_message *ms;
2955 struct dlm_mhandle *mh;
2956 int to_nodeid, error;
2958 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2962 to_nodeid = dlm_dir_nodeid(r);
2964 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2968 send_args(r, lkb, ms);
2970 error = send_message(mh, ms);
2976 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2980 static int send_remove(struct dlm_rsb *r)
2982 struct dlm_message *ms;
2983 struct dlm_mhandle *mh;
2984 int to_nodeid, error;
2986 to_nodeid = dlm_dir_nodeid(r);
2988 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2992 memcpy(ms->m_extra, r->res_name, r->res_length);
2993 ms->m_hash = r->res_hash;
2995 error = send_message(mh, ms);
3000 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3003 struct dlm_message *ms;
3004 struct dlm_mhandle *mh;
3005 int to_nodeid, error;
3007 to_nodeid = lkb->lkb_nodeid;
3009 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3013 send_args(r, lkb, ms);
3017 error = send_message(mh, ms);
3022 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3024 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3027 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3029 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3032 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3034 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3037 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3039 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3042 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3043 int ret_nodeid, int rv)
3045 struct dlm_rsb *r = &ls->ls_stub_rsb;
3046 struct dlm_message *ms;
3047 struct dlm_mhandle *mh;
3048 int error, nodeid = ms_in->m_header.h_nodeid;
3050 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3054 ms->m_lkid = ms_in->m_lkid;
3056 ms->m_nodeid = ret_nodeid;
3058 error = send_message(mh, ms);
3063 /* which args we save from a received message depends heavily on the type
3064 of message, unlike the send side where we can safely send everything about
3065 the lkb for any type of message */
3067 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3069 lkb->lkb_exflags = ms->m_exflags;
3070 lkb->lkb_sbflags = ms->m_sbflags;
3071 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3072 (ms->m_flags & 0x0000FFFF);
3075 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3077 lkb->lkb_sbflags = ms->m_sbflags;
3078 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3079 (ms->m_flags & 0x0000FFFF);
3082 static int receive_extralen(struct dlm_message *ms)
3084 return (ms->m_header.h_length - sizeof(struct dlm_message));
3087 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3088 struct dlm_message *ms)
3092 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3093 if (!lkb->lkb_lvbptr)
3094 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3095 if (!lkb->lkb_lvbptr)
3097 len = receive_extralen(ms);
3098 if (len > DLM_RESNAME_MAXLEN)
3099 len = DLM_RESNAME_MAXLEN;
3100 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3105 static void fake_bastfn(void *astparam, int mode)
3107 log_print("fake_bastfn should not be called");
3110 static void fake_astfn(void *astparam)
3112 log_print("fake_astfn should not be called");
3115 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3116 struct dlm_message *ms)
3118 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3119 lkb->lkb_ownpid = ms->m_pid;
3120 lkb->lkb_remid = ms->m_lkid;
3121 lkb->lkb_grmode = DLM_LOCK_IV;
3122 lkb->lkb_rqmode = ms->m_rqmode;
3124 lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3125 lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3127 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3128 /* lkb was just created so there won't be an lvb yet */
3129 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3130 if (!lkb->lkb_lvbptr)
3137 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3138 struct dlm_message *ms)
3140 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3143 if (receive_lvb(ls, lkb, ms))
3146 lkb->lkb_rqmode = ms->m_rqmode;
3147 lkb->lkb_lvbseq = ms->m_lvbseq;
3152 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3153 struct dlm_message *ms)
3155 if (receive_lvb(ls, lkb, ms))
3160 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3161 uses to send a reply and that the remote end uses to process the reply. */
3163 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3165 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3166 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3167 lkb->lkb_remid = ms->m_lkid;
3170 /* This is called after the rsb is locked so that we can safely inspect
3171 fields in the lkb. */
3173 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3175 int from = ms->m_header.h_nodeid;
3178 switch (ms->m_type) {
3179 case DLM_MSG_CONVERT:
3180 case DLM_MSG_UNLOCK:
3181 case DLM_MSG_CANCEL:
3182 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3186 case DLM_MSG_CONVERT_REPLY:
3187 case DLM_MSG_UNLOCK_REPLY:
3188 case DLM_MSG_CANCEL_REPLY:
3191 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3195 case DLM_MSG_REQUEST_REPLY:
3196 if (!is_process_copy(lkb))
3198 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3207 log_error(lkb->lkb_resource->res_ls,
3208 "ignore invalid message %d from %d %x %x %x %d",
3209 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3210 lkb->lkb_flags, lkb->lkb_nodeid);
3214 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3216 struct dlm_lkb *lkb;
3220 error = create_lkb(ls, &lkb);
3224 receive_flags(lkb, ms);
3225 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3226 error = receive_request_args(ls, lkb, ms);
3232 namelen = receive_extralen(ms);
3234 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3243 error = do_request(r, lkb);
3244 send_request_reply(r, lkb, error);
3245 do_request_effects(r, lkb, error);
3250 if (error == -EINPROGRESS)
3257 setup_stub_lkb(ls, ms);
3258 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3261 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3263 struct dlm_lkb *lkb;
3265 int error, reply = 1;
3267 error = find_lkb(ls, ms->m_remid, &lkb);
3271 r = lkb->lkb_resource;
3276 error = validate_message(lkb, ms);
3280 receive_flags(lkb, ms);
3282 error = receive_convert_args(ls, lkb, ms);
3284 send_convert_reply(r, lkb, error);
3288 reply = !down_conversion(lkb);
3290 error = do_convert(r, lkb);
3292 send_convert_reply(r, lkb, error);
3293 do_convert_effects(r, lkb, error);
3301 setup_stub_lkb(ls, ms);
3302 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3305 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3307 struct dlm_lkb *lkb;
3311 error = find_lkb(ls, ms->m_remid, &lkb);
3315 r = lkb->lkb_resource;
3320 error = validate_message(lkb, ms);
3324 receive_flags(lkb, ms);
3326 error = receive_unlock_args(ls, lkb, ms);
3328 send_unlock_reply(r, lkb, error);
3332 error = do_unlock(r, lkb);
3333 send_unlock_reply(r, lkb, error);
3334 do_unlock_effects(r, lkb, error);
3342 setup_stub_lkb(ls, ms);
3343 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3346 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3348 struct dlm_lkb *lkb;
3352 error = find_lkb(ls, ms->m_remid, &lkb);
3356 receive_flags(lkb, ms);
3358 r = lkb->lkb_resource;
3363 error = validate_message(lkb, ms);
3367 error = do_cancel(r, lkb);
3368 send_cancel_reply(r, lkb, error);
3369 do_cancel_effects(r, lkb, error);
3377 setup_stub_lkb(ls, ms);
3378 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3381 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3383 struct dlm_lkb *lkb;
3387 error = find_lkb(ls, ms->m_remid, &lkb);
3389 log_debug(ls, "receive_grant from %d no lkb %x",
3390 ms->m_header.h_nodeid, ms->m_remid);
3394 r = lkb->lkb_resource;
3399 error = validate_message(lkb, ms);
3403 receive_flags_reply(lkb, ms);
3404 if (is_altmode(lkb))
3405 munge_altmode(lkb, ms);
3406 grant_lock_pc(r, lkb, ms);
3407 queue_cast(r, lkb, 0);
3414 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3416 struct dlm_lkb *lkb;
3420 error = find_lkb(ls, ms->m_remid, &lkb);
3422 log_debug(ls, "receive_bast from %d no lkb %x",
3423 ms->m_header.h_nodeid, ms->m_remid);
3427 r = lkb->lkb_resource;
3432 error = validate_message(lkb, ms);
3436 queue_bast(r, lkb, ms->m_bastmode);
3443 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3445 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3447 from_nodeid = ms->m_header.h_nodeid;
3448 our_nodeid = dlm_our_nodeid();
3450 len = receive_extralen(ms);
3452 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3453 if (dir_nodeid != our_nodeid) {
3454 log_error(ls, "lookup dir_nodeid %d from %d",
3455 dir_nodeid, from_nodeid);
3461 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3463 /* Optimization: we're master so treat lookup as a request */
3464 if (!error && ret_nodeid == our_nodeid) {
3465 receive_request(ls, ms);
3469 send_lookup_reply(ls, ms, ret_nodeid, error);
3472 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3474 int len, dir_nodeid, from_nodeid;
3476 from_nodeid = ms->m_header.h_nodeid;
3478 len = receive_extralen(ms);
3480 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3481 if (dir_nodeid != dlm_our_nodeid()) {
3482 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3483 dir_nodeid, from_nodeid);
3487 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3490 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3492 do_purge(ls, ms->m_nodeid, ms->m_pid);
3495 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3497 struct dlm_lkb *lkb;
3499 int error, mstype, result;
3501 error = find_lkb(ls, ms->m_remid, &lkb);
3503 log_debug(ls, "receive_request_reply from %d no lkb %x",
3504 ms->m_header.h_nodeid, ms->m_remid);
3508 r = lkb->lkb_resource;
3512 error = validate_message(lkb, ms);
3516 mstype = lkb->lkb_wait_type;
3517 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3521 /* Optimization: the dir node was also the master, so it took our
3522 lookup as a request and sent request reply instead of lookup reply */
3523 if (mstype == DLM_MSG_LOOKUP) {
3524 r->res_nodeid = ms->m_header.h_nodeid;
3525 lkb->lkb_nodeid = r->res_nodeid;
3528 /* this is the value returned from do_request() on the master */
3529 result = ms->m_result;
3533 /* request would block (be queued) on remote master */
3534 queue_cast(r, lkb, -EAGAIN);
3535 confirm_master(r, -EAGAIN);
3536 unhold_lkb(lkb); /* undoes create_lkb() */
3541 /* request was queued or granted on remote master */
3542 receive_flags_reply(lkb, ms);
3543 lkb->lkb_remid = ms->m_lkid;
3544 if (is_altmode(lkb))
3545 munge_altmode(lkb, ms);
3547 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3550 grant_lock_pc(r, lkb, ms);
3551 queue_cast(r, lkb, 0);
3553 confirm_master(r, result);
3558 /* find_rsb failed to find rsb or rsb wasn't master */
3559 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3560 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3562 lkb->lkb_nodeid = -1;
3564 if (is_overlap(lkb)) {
3565 /* we'll ignore error in cancel/unlock reply */
3566 queue_cast_overlap(r, lkb);
3567 confirm_master(r, result);
3568 unhold_lkb(lkb); /* undoes create_lkb() */
3570 _request_lock(r, lkb);
3574 log_error(ls, "receive_request_reply %x error %d",
3575 lkb->lkb_id, result);
3578 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3579 log_debug(ls, "receive_request_reply %x result %d unlock",
3580 lkb->lkb_id, result);
3581 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3582 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3583 send_unlock(r, lkb);
3584 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3585 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3586 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3587 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3588 send_cancel(r, lkb);
3590 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3591 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3599 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3600 struct dlm_message *ms)
3602 /* this is the value returned from do_convert() on the master */
3603 switch (ms->m_result) {
3605 /* convert would block (be queued) on remote master */
3606 queue_cast(r, lkb, -EAGAIN);
3610 receive_flags_reply(lkb, ms);
3611 revert_lock_pc(r, lkb);
3612 queue_cast(r, lkb, -EDEADLK);
3616 /* convert was queued on remote master */
3617 receive_flags_reply(lkb, ms);
3618 if (is_demoted(lkb))
3619 munge_demoted(lkb, ms);
3621 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3626 /* convert was granted on remote master */
3627 receive_flags_reply(lkb, ms);
3628 if (is_demoted(lkb))
3629 munge_demoted(lkb, ms);
3630 grant_lock_pc(r, lkb, ms);
3631 queue_cast(r, lkb, 0);
3635 log_error(r->res_ls, "receive_convert_reply %x error %d",
3636 lkb->lkb_id, ms->m_result);
3640 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3642 struct dlm_rsb *r = lkb->lkb_resource;
3648 error = validate_message(lkb, ms);
3652 /* stub reply can happen with waiters_mutex held */
3653 error = remove_from_waiters_ms(lkb, ms);
3657 __receive_convert_reply(r, lkb, ms);
3663 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3665 struct dlm_lkb *lkb;
3668 error = find_lkb(ls, ms->m_remid, &lkb);
3670 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3671 ms->m_header.h_nodeid, ms->m_remid);
3675 _receive_convert_reply(lkb, ms);
3679 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3681 struct dlm_rsb *r = lkb->lkb_resource;
3687 error = validate_message(lkb, ms);
3691 /* stub reply can happen with waiters_mutex held */
3692 error = remove_from_waiters_ms(lkb, ms);
3696 /* this is the value returned from do_unlock() on the master */
3698 switch (ms->m_result) {
3700 receive_flags_reply(lkb, ms);
3701 remove_lock_pc(r, lkb);
3702 queue_cast(r, lkb, -DLM_EUNLOCK);
3707 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3708 lkb->lkb_id, ms->m_result);
3715 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3717 struct dlm_lkb *lkb;
3720 error = find_lkb(ls, ms->m_remid, &lkb);
3722 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3723 ms->m_header.h_nodeid, ms->m_remid);
3727 _receive_unlock_reply(lkb, ms);
3731 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3733 struct dlm_rsb *r = lkb->lkb_resource;
3739 error = validate_message(lkb, ms);
3743 /* stub reply can happen with waiters_mutex held */
3744 error = remove_from_waiters_ms(lkb, ms);
3748 /* this is the value returned from do_cancel() on the master */
3750 switch (ms->m_result) {
3752 receive_flags_reply(lkb, ms);
3753 revert_lock_pc(r, lkb);
3754 queue_cast(r, lkb, -DLM_ECANCEL);
3759 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3760 lkb->lkb_id, ms->m_result);
3767 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3769 struct dlm_lkb *lkb;
3772 error = find_lkb(ls, ms->m_remid, &lkb);
3774 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3775 ms->m_header.h_nodeid, ms->m_remid);
3779 _receive_cancel_reply(lkb, ms);
3783 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3785 struct dlm_lkb *lkb;
3787 int error, ret_nodeid;
3789 error = find_lkb(ls, ms->m_lkid, &lkb);
3791 log_error(ls, "receive_lookup_reply no lkb");
3795 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3796 FIXME: will a non-zero error ever be returned? */
3798 r = lkb->lkb_resource;
3802 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3806 ret_nodeid = ms->m_nodeid;
3807 if (ret_nodeid == dlm_our_nodeid()) {
3810 r->res_first_lkid = 0;
3812 /* set_master() will copy res_nodeid to lkb_nodeid */
3813 r->res_nodeid = ret_nodeid;
3816 if (is_overlap(lkb)) {
3817 log_debug(ls, "receive_lookup_reply %x unlock %x",
3818 lkb->lkb_id, lkb->lkb_flags);
3819 queue_cast_overlap(r, lkb);
3820 unhold_lkb(lkb); /* undoes create_lkb() */
3824 _request_lock(r, lkb);
3828 process_lookup_list(r);
3835 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3837 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3838 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3839 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3840 ms->m_remid, ms->m_result);
3844 switch (ms->m_type) {
3846 /* messages sent to a master node */
3848 case DLM_MSG_REQUEST:
3849 receive_request(ls, ms);
3852 case DLM_MSG_CONVERT:
3853 receive_convert(ls, ms);
3856 case DLM_MSG_UNLOCK:
3857 receive_unlock(ls, ms);
3860 case DLM_MSG_CANCEL:
3861 receive_cancel(ls, ms);
3864 /* messages sent from a master node (replies to above) */
3866 case DLM_MSG_REQUEST_REPLY:
3867 receive_request_reply(ls, ms);
3870 case DLM_MSG_CONVERT_REPLY:
3871 receive_convert_reply(ls, ms);
3874 case DLM_MSG_UNLOCK_REPLY:
3875 receive_unlock_reply(ls, ms);
3878 case DLM_MSG_CANCEL_REPLY:
3879 receive_cancel_reply(ls, ms);
3882 /* messages sent from a master node (only two types of async msg) */
3885 receive_grant(ls, ms);
3889 receive_bast(ls, ms);
3892 /* messages sent to a dir node */
3894 case DLM_MSG_LOOKUP:
3895 receive_lookup(ls, ms);
3898 case DLM_MSG_REMOVE:
3899 receive_remove(ls, ms);
3902 /* messages sent from a dir node (remove has no reply) */
3904 case DLM_MSG_LOOKUP_REPLY:
3905 receive_lookup_reply(ls, ms);
3908 /* other messages */
3911 receive_purge(ls, ms);
3915 log_error(ls, "unknown message type %d", ms->m_type);
3921 /* If the lockspace is in recovery mode (locking stopped), then normal
3922 messages are saved on the requestqueue for processing after recovery is
3923 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
3924 messages off the requestqueue before we process new ones. This occurs right
3925 after recovery completes when we transition from saving all messages on
3926 requestqueue, to processing all the saved messages, to processing new
3927 messages as they arrive. */
3929 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3932 if (dlm_locking_stopped(ls)) {
3933 dlm_add_requestqueue(ls, nodeid, ms);
3935 dlm_wait_requestqueue(ls);
3936 _receive_message(ls, ms);
3940 /* This is called by dlm_recoverd to process messages that were saved on
3941 the requestqueue. */
3943 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3945 _receive_message(ls, ms);
3948 /* This is called by the midcomms layer when something is received for
3949 the lockspace. It could be either a MSG (normal message sent as part of
3950 standard locking activity) or an RCOM (recovery message sent as part of
3951 lockspace recovery). */
3953 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3955 struct dlm_header *hd = &p->header;
3959 switch (hd->h_cmd) {
3961 dlm_message_in(&p->message);
3962 type = p->message.m_type;
3965 dlm_rcom_in(&p->rcom);
3966 type = p->rcom.rc_type;
3969 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3973 if (hd->h_nodeid != nodeid) {
3974 log_print("invalid h_nodeid %d from %d lockspace %x",
3975 hd->h_nodeid, nodeid, hd->h_lockspace);
3979 ls = dlm_find_lockspace_global(hd->h_lockspace);
3981 if (dlm_config.ci_log_debug)
3982 log_print("invalid lockspace %x from %d cmd %d type %d",
3983 hd->h_lockspace, nodeid, hd->h_cmd, type);
3985 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3986 dlm_send_ls_not_ready(nodeid, &p->rcom);
3990 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3991 be inactive (in this ls) before transitioning to recovery mode */
3993 down_read(&ls->ls_recv_active);
3994 if (hd->h_cmd == DLM_MSG)
3995 dlm_receive_message(ls, &p->message, nodeid);
3997 dlm_receive_rcom(ls, &p->rcom, nodeid);
3998 up_read(&ls->ls_recv_active);
4000 dlm_put_lockspace(ls);
4003 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4005 if (middle_conversion(lkb)) {
4007 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4008 ls->ls_stub_ms.m_result = -EINPROGRESS;
4009 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4010 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4011 _receive_convert_reply(lkb, &ls->ls_stub_ms);
4013 /* Same special case as in receive_rcom_lock_args() */
4014 lkb->lkb_grmode = DLM_LOCK_IV;
4015 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4018 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4019 lkb->lkb_flags |= DLM_IFL_RESEND;
4022 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4023 conversions are async; there's no reply from the remote master */
4026 /* A waiting lkb needs recovery if the master node has failed, or
4027 the master node is changing (only when no directory is used) */
4029 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4031 if (dlm_is_removed(ls, lkb->lkb_nodeid))
4034 if (!dlm_no_directory(ls))
4037 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4043 /* Recovery for locks that are waiting for replies from nodes that are now
4044 gone. We can just complete unlocks and cancels by faking a reply from the
4045 dead node. Requests and up-conversions we flag to be resent after
4046 recovery. Down-conversions can just be completed with a fake reply like
4047 unlocks. Conversions between PR and CW need special attention. */
4049 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4051 struct dlm_lkb *lkb, *safe;
4052 int wait_type, stub_unlock_result, stub_cancel_result;
4054 mutex_lock(&ls->ls_waiters_mutex);
4056 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4057 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4058 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4060 /* all outstanding lookups, regardless of destination will be
4061 resent after recovery is done */
4063 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4064 lkb->lkb_flags |= DLM_IFL_RESEND;
4068 if (!waiter_needs_recovery(ls, lkb))
4071 wait_type = lkb->lkb_wait_type;
4072 stub_unlock_result = -DLM_EUNLOCK;
4073 stub_cancel_result = -DLM_ECANCEL;
4075 /* Main reply may have been received leaving a zero wait_type,
4076 but a reply for the overlapping op may not have been
4077 received. In that case we need to fake the appropriate
4078 reply for the overlap op. */
4081 if (is_overlap_cancel(lkb)) {
4082 wait_type = DLM_MSG_CANCEL;
4083 if (lkb->lkb_grmode == DLM_LOCK_IV)
4084 stub_cancel_result = 0;
4086 if (is_overlap_unlock(lkb)) {
4087 wait_type = DLM_MSG_UNLOCK;
4088 if (lkb->lkb_grmode == DLM_LOCK_IV)
4089 stub_unlock_result = -ENOENT;
4092 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4093 lkb->lkb_id, lkb->lkb_flags, wait_type,
4094 stub_cancel_result, stub_unlock_result);
4097 switch (wait_type) {
4099 case DLM_MSG_REQUEST:
4100 lkb->lkb_flags |= DLM_IFL_RESEND;
4103 case DLM_MSG_CONVERT:
4104 recover_convert_waiter(ls, lkb);
4107 case DLM_MSG_UNLOCK:
4109 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4110 ls->ls_stub_ms.m_result = stub_unlock_result;
4111 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4112 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4113 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4117 case DLM_MSG_CANCEL:
4119 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4120 ls->ls_stub_ms.m_result = stub_cancel_result;
4121 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4122 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4123 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4128 log_error(ls, "invalid lkb wait_type %d %d",
4129 lkb->lkb_wait_type, wait_type);
4133 mutex_unlock(&ls->ls_waiters_mutex);
4136 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4138 struct dlm_lkb *lkb;
4141 mutex_lock(&ls->ls_waiters_mutex);
4142 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4143 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4149 mutex_unlock(&ls->ls_waiters_mutex);
4156 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4157 master or dir-node for r. Processing the lkb may result in it being placed
4160 /* We do this after normal locking has been enabled and any saved messages
4161 (in requestqueue) have been processed. We should be confident that at
4162 this point we won't get or process a reply to any of these waiting
4163 operations. But, new ops may be coming in on the rsbs/locks here from
4164 userspace or remotely. */
4166 /* there may have been an overlap unlock/cancel prior to recovery or after
4167 recovery. if before, the lkb may still have a pos wait_count; if after, the
4168 overlap flag would just have been set and nothing new sent. we can be
4169 confident here than any replies to either the initial op or overlap ops
4170 prior to recovery have been received. */
4172 int dlm_recover_waiters_post(struct dlm_ls *ls)
4174 struct dlm_lkb *lkb;
4176 int error = 0, mstype, err, oc, ou;
4179 if (dlm_locking_stopped(ls)) {
4180 log_debug(ls, "recover_waiters_post aborted");
4185 lkb = find_resend_waiter(ls);
4189 r = lkb->lkb_resource;
4193 mstype = lkb->lkb_wait_type;
4194 oc = is_overlap_cancel(lkb);
4195 ou = is_overlap_unlock(lkb);
4198 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4199 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4201 /* At this point we assume that we won't get a reply to any
4202 previous op or overlap op on this lock. First, do a big
4203 remove_from_waiters() for all previous ops. */
4205 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4206 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4207 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4208 lkb->lkb_wait_type = 0;
4209 lkb->lkb_wait_count = 0;
4210 mutex_lock(&ls->ls_waiters_mutex);
4211 list_del_init(&lkb->lkb_wait_reply);
4212 mutex_unlock(&ls->ls_waiters_mutex);
4213 unhold_lkb(lkb); /* for waiters list */
4216 /* do an unlock or cancel instead of resending */
4218 case DLM_MSG_LOOKUP:
4219 case DLM_MSG_REQUEST:
4220 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4222 unhold_lkb(lkb); /* undoes create_lkb() */
4224 case DLM_MSG_CONVERT:
4226 queue_cast(r, lkb, -DLM_ECANCEL);
4228 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4229 _unlock_lock(r, lkb);
4237 case DLM_MSG_LOOKUP:
4238 case DLM_MSG_REQUEST:
4239 _request_lock(r, lkb);
4241 confirm_master(r, 0);
4243 case DLM_MSG_CONVERT:
4244 _convert_lock(r, lkb);
4252 log_error(ls, "recover_waiters_post %x %d %x %d %d",
4253 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4262 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4263 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4265 struct dlm_ls *ls = r->res_ls;
4266 struct dlm_lkb *lkb, *safe;
4268 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4269 if (test(ls, lkb)) {
4270 rsb_set_flag(r, RSB_LOCKS_PURGED);
4272 /* this put should free the lkb */
4273 if (!dlm_put_lkb(lkb))
4274 log_error(ls, "purged lkb not released");
4279 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4281 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4284 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4286 return is_master_copy(lkb);
4289 static void purge_dead_locks(struct dlm_rsb *r)
4291 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4292 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4293 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4296 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4298 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4299 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4300 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4303 /* Get rid of locks held by nodes that are gone. */
4305 int dlm_purge_locks(struct dlm_ls *ls)
4309 log_debug(ls, "dlm_purge_locks");
4311 down_write(&ls->ls_root_sem);
4312 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4316 purge_dead_locks(r);
4322 up_write(&ls->ls_root_sem);
4327 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4329 struct dlm_rsb *r, *r_ret = NULL;
4331 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4332 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4333 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4336 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4340 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4344 void dlm_grant_after_purge(struct dlm_ls *ls)
4350 r = find_purged_rsb(ls, bucket);
4352 if (bucket == ls->ls_rsbtbl_size - 1)
4359 grant_pending_locks(r);
4360 confirm_master(r, 0);
4368 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4371 struct dlm_lkb *lkb;
4373 list_for_each_entry(lkb, head, lkb_statequeue) {
4374 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4380 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4383 struct dlm_lkb *lkb;
4385 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4388 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4391 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4397 /* needs at least dlm_rcom + rcom_lock */
4398 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4399 struct dlm_rsb *r, struct dlm_rcom *rc)
4401 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4403 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4404 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4405 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4406 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4407 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4408 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4409 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4410 lkb->lkb_rqmode = rl->rl_rqmode;
4411 lkb->lkb_grmode = rl->rl_grmode;
4412 /* don't set lkb_status because add_lkb wants to itself */
4414 lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4415 lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4417 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4418 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4419 sizeof(struct rcom_lock);
4420 if (lvblen > ls->ls_lvblen)
4422 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4423 if (!lkb->lkb_lvbptr)
4425 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4428 /* Conversions between PR and CW (middle modes) need special handling.
4429 The real granted mode of these converting locks cannot be determined
4430 until all locks have been rebuilt on the rsb (recover_conversion) */
4432 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4433 middle_conversion(lkb)) {
4434 rl->rl_status = DLM_LKSTS_CONVERT;
4435 lkb->lkb_grmode = DLM_LOCK_IV;
4436 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4442 /* This lkb may have been recovered in a previous aborted recovery so we need
4443 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4444 If so we just send back a standard reply. If not, we create a new lkb with
4445 the given values and send back our lkid. We send back our lkid by sending
4446 back the rcom_lock struct we got but with the remid field filled in. */
4448 /* needs at least dlm_rcom + rcom_lock */
4449 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4451 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4453 struct dlm_lkb *lkb;
4456 if (rl->rl_parent_lkid) {
4457 error = -EOPNOTSUPP;
4461 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4468 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4474 error = create_lkb(ls, &lkb);
4478 error = receive_rcom_lock_args(ls, lkb, r, rc);
4485 add_lkb(r, lkb, rl->rl_status);
4489 /* this is the new value returned to the lock holder for
4490 saving in its process-copy lkb */
4491 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4498 log_debug(ls, "recover_master_copy %d %x", error,
4499 le32_to_cpu(rl->rl_lkid));
4500 rl->rl_result = cpu_to_le32(error);
4504 /* needs at least dlm_rcom + rcom_lock */
4505 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4507 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4509 struct dlm_lkb *lkb;
4512 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4514 log_error(ls, "recover_process_copy no lkid %x",
4515 le32_to_cpu(rl->rl_lkid));
4519 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4521 error = le32_to_cpu(rl->rl_result);
4523 r = lkb->lkb_resource;
4529 /* There's a chance the new master received our lock before
4530 dlm_recover_master_reply(), this wouldn't happen if we did
4531 a barrier between recover_masters and recover_locks. */
4532 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4533 (unsigned long)r, r->res_name);
4534 dlm_send_rcom_lock(r, lkb);
4537 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4540 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4543 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4544 error, lkb->lkb_id);
4547 /* an ack for dlm_recover_locks() which waits for replies from
4548 all the locks it sends to new masters */
4549 dlm_recovered_lock(r);
4558 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4559 int mode, uint32_t flags, void *name, unsigned int namelen,
4560 unsigned long timeout_cs)
4562 struct dlm_lkb *lkb;
4563 struct dlm_args args;
4566 dlm_lock_recovery(ls);
4568 error = create_lkb(ls, &lkb);
4574 if (flags & DLM_LKF_VALBLK) {
4575 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4576 if (!ua->lksb.sb_lvbptr) {
4584 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4585 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4586 lock and that lkb_astparam is the dlm_user_args structure. */
4588 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4589 fake_astfn, ua, fake_bastfn, &args);
4590 lkb->lkb_flags |= DLM_IFL_USER;
4591 ua->old_mode = DLM_LOCK_IV;
4598 error = request_lock(ls, lkb, name, namelen, &args);
4614 /* add this new lkb to the per-process list of locks */
4615 spin_lock(&ua->proc->locks_spin);
4617 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4618 spin_unlock(&ua->proc->locks_spin);
4620 dlm_unlock_recovery(ls);
4624 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4625 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4626 unsigned long timeout_cs)
4628 struct dlm_lkb *lkb;
4629 struct dlm_args args;
4630 struct dlm_user_args *ua;
4633 dlm_lock_recovery(ls);
4635 error = find_lkb(ls, lkid, &lkb);
4639 /* user can change the params on its lock when it converts it, or
4640 add an lvb that didn't exist before */
4644 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4645 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4646 if (!ua->lksb.sb_lvbptr) {
4651 if (lvb_in && ua->lksb.sb_lvbptr)
4652 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4654 ua->xid = ua_tmp->xid;
4655 ua->castparam = ua_tmp->castparam;
4656 ua->castaddr = ua_tmp->castaddr;
4657 ua->bastparam = ua_tmp->bastparam;
4658 ua->bastaddr = ua_tmp->bastaddr;
4659 ua->user_lksb = ua_tmp->user_lksb;
4660 ua->old_mode = lkb->lkb_grmode;
4662 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4663 fake_astfn, ua, fake_bastfn, &args);
4667 error = convert_lock(ls, lkb, &args);
4669 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4674 dlm_unlock_recovery(ls);
4679 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4680 uint32_t flags, uint32_t lkid, char *lvb_in)
4682 struct dlm_lkb *lkb;
4683 struct dlm_args args;
4684 struct dlm_user_args *ua;
4687 dlm_lock_recovery(ls);
4689 error = find_lkb(ls, lkid, &lkb);
4695 if (lvb_in && ua->lksb.sb_lvbptr)
4696 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4697 if (ua_tmp->castparam)
4698 ua->castparam = ua_tmp->castparam;
4699 ua->user_lksb = ua_tmp->user_lksb;
4701 error = set_unlock_args(flags, ua, &args);
4705 error = unlock_lock(ls, lkb, &args);
4707 if (error == -DLM_EUNLOCK)
4709 /* from validate_unlock_args() */
4710 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4715 spin_lock(&ua->proc->locks_spin);
4716 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4717 if (!list_empty(&lkb->lkb_ownqueue))
4718 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4719 spin_unlock(&ua->proc->locks_spin);
4723 dlm_unlock_recovery(ls);
4728 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4729 uint32_t flags, uint32_t lkid)
4731 struct dlm_lkb *lkb;
4732 struct dlm_args args;
4733 struct dlm_user_args *ua;
4736 dlm_lock_recovery(ls);
4738 error = find_lkb(ls, lkid, &lkb);
4743 if (ua_tmp->castparam)
4744 ua->castparam = ua_tmp->castparam;
4745 ua->user_lksb = ua_tmp->user_lksb;
4747 error = set_unlock_args(flags, ua, &args);
4751 error = cancel_lock(ls, lkb, &args);
4753 if (error == -DLM_ECANCEL)
4755 /* from validate_unlock_args() */
4756 if (error == -EBUSY)
4761 dlm_unlock_recovery(ls);
4766 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4768 struct dlm_lkb *lkb;
4769 struct dlm_args args;
4770 struct dlm_user_args *ua;
4774 dlm_lock_recovery(ls);
4776 error = find_lkb(ls, lkid, &lkb);
4782 error = set_unlock_args(flags, ua, &args);
4786 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4788 r = lkb->lkb_resource;
4792 error = validate_unlock_args(lkb, &args);
4795 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4797 error = _cancel_lock(r, lkb);
4802 if (error == -DLM_ECANCEL)
4804 /* from validate_unlock_args() */
4805 if (error == -EBUSY)
4810 dlm_unlock_recovery(ls);
4814 /* lkb's that are removed from the waiters list by revert are just left on the
4815 orphans list with the granted orphan locks, to be freed by purge */
4817 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4819 struct dlm_args args;
4823 mutex_lock(&ls->ls_orphans_mutex);
4824 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4825 mutex_unlock(&ls->ls_orphans_mutex);
4827 set_unlock_args(0, lkb->lkb_ua, &args);
4829 error = cancel_lock(ls, lkb, &args);
4830 if (error == -DLM_ECANCEL)
4835 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4836 Regardless of what rsb queue the lock is on, it's removed and freed. */
4838 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4840 struct dlm_args args;
4843 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4845 error = unlock_lock(ls, lkb, &args);
4846 if (error == -DLM_EUNLOCK)
4851 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4852 (which does lock_rsb) due to deadlock with receiving a message that does
4853 lock_rsb followed by dlm_user_add_ast() */
4855 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4856 struct dlm_user_proc *proc)
4858 struct dlm_lkb *lkb = NULL;
4860 mutex_lock(&ls->ls_clear_proc_locks);
4861 if (list_empty(&proc->locks))
4864 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4865 list_del_init(&lkb->lkb_ownqueue);
4867 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4868 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4870 lkb->lkb_flags |= DLM_IFL_DEAD;
4872 mutex_unlock(&ls->ls_clear_proc_locks);
4876 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4877 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4878 which we clear here. */
4880 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4881 list, and no more device_writes should add lkb's to proc->locks list; so we
4882 shouldn't need to take asts_spin or locks_spin here. this assumes that
4883 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4886 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4888 struct dlm_lkb *lkb, *safe;
4890 dlm_lock_recovery(ls);
4893 lkb = del_proc_lock(ls, proc);
4897 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4898 orphan_proc_lock(ls, lkb);
4900 unlock_proc_lock(ls, lkb);
4902 /* this removes the reference for the proc->locks list
4903 added by dlm_user_request, it may result in the lkb
4909 mutex_lock(&ls->ls_clear_proc_locks);
4911 /* in-progress unlocks */
4912 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4913 list_del_init(&lkb->lkb_ownqueue);
4914 lkb->lkb_flags |= DLM_IFL_DEAD;
4918 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4919 lkb->lkb_ast_type = 0;
4920 list_del(&lkb->lkb_astqueue);
4924 mutex_unlock(&ls->ls_clear_proc_locks);
4925 dlm_unlock_recovery(ls);
4928 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4930 struct dlm_lkb *lkb, *safe;
4934 spin_lock(&proc->locks_spin);
4935 if (!list_empty(&proc->locks)) {
4936 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4938 list_del_init(&lkb->lkb_ownqueue);
4940 spin_unlock(&proc->locks_spin);
4945 lkb->lkb_flags |= DLM_IFL_DEAD;
4946 unlock_proc_lock(ls, lkb);
4947 dlm_put_lkb(lkb); /* ref from proc->locks list */
4950 spin_lock(&proc->locks_spin);
4951 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4952 list_del_init(&lkb->lkb_ownqueue);
4953 lkb->lkb_flags |= DLM_IFL_DEAD;
4956 spin_unlock(&proc->locks_spin);
4958 spin_lock(&proc->asts_spin);
4959 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4960 list_del(&lkb->lkb_astqueue);
4963 spin_unlock(&proc->asts_spin);
4966 /* pid of 0 means purge all orphans */
4968 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4970 struct dlm_lkb *lkb, *safe;
4972 mutex_lock(&ls->ls_orphans_mutex);
4973 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4974 if (pid && lkb->lkb_ownpid != pid)
4976 unlock_proc_lock(ls, lkb);
4977 list_del_init(&lkb->lkb_ownqueue);
4980 mutex_unlock(&ls->ls_orphans_mutex);
4983 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4985 struct dlm_message *ms;
4986 struct dlm_mhandle *mh;
4989 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4990 DLM_MSG_PURGE, &ms, &mh);
4993 ms->m_nodeid = nodeid;
4996 return send_message(mh, ms);
4999 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5000 int nodeid, int pid)
5004 if (nodeid != dlm_our_nodeid()) {
5005 error = send_purge(ls, nodeid, pid);
5007 dlm_lock_recovery(ls);
5008 if (pid == current->pid)
5009 purge_proc_locks(ls, proc);
5011 do_purge(ls, nodeid, pid);
5012 dlm_unlock_recovery(ls);