dlm: fix race between remove and lookup
[firefly-linux-kernel-4.4.55.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 error = -ENOTBLK;
691                 goto out_unlock;
692         }
693
694         if (from_other) {
695                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696                           from_nodeid, dir_nodeid, r->res_name);
697         }
698
699         if (dir_nodeid == our_nodeid) {
700                 /* When we are the dir nodeid, we can set the master
701                    node immediately */
702                 r->res_master_nodeid = our_nodeid;
703                 r->res_nodeid = 0;
704         } else {
705                 /* set_master will send_lookup to dir_nodeid */
706                 r->res_master_nodeid = 0;
707                 r->res_nodeid = -1;
708         }
709
710  out_add:
711         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712  out_unlock:
713         spin_unlock(&ls->ls_rsbtbl[b].lock);
714  out:
715         *r_ret = r;
716         return error;
717 }
718
719 /* During recovery, other nodes can send us new MSTCPY locks (from
720    dlm_recover_locks) before we've made ourself master (in
721    dlm_recover_masters). */
722
723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724                           uint32_t hash, uint32_t b,
725                           int dir_nodeid, int from_nodeid,
726                           unsigned int flags, struct dlm_rsb **r_ret)
727 {
728         struct dlm_rsb *r = NULL;
729         int our_nodeid = dlm_our_nodeid();
730         int recover = (flags & R_RECEIVE_RECOVER);
731         int error;
732
733  retry:
734         error = pre_rsb_struct(ls);
735         if (error < 0)
736                 goto out;
737
738         spin_lock(&ls->ls_rsbtbl[b].lock);
739
740         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741         if (error)
742                 goto do_toss;
743
744         /*
745          * rsb is active, so we can't check master_nodeid without lock_rsb.
746          */
747
748         kref_get(&r->res_ref);
749         goto out_unlock;
750
751
752  do_toss:
753         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754         if (error)
755                 goto do_new;
756
757         /*
758          * rsb found inactive. No other thread is using this rsb because
759          * it's on the toss list, so we can look at or update
760          * res_master_nodeid without lock_rsb.
761          */
762
763         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764                 /* our rsb is not master, and another node has sent us a
765                    request; this should never happen */
766                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767                           from_nodeid, r->res_master_nodeid, dir_nodeid);
768                 dlm_print_rsb(r);
769                 error = -ENOTBLK;
770                 goto out_unlock;
771         }
772
773         if (!recover && (r->res_master_nodeid != our_nodeid) &&
774             (dir_nodeid == our_nodeid)) {
775                 /* our rsb is not master, and we are dir; may as well fix it;
776                    this should never happen */
777                 log_error(ls, "find_rsb toss our %d master %d dir %d",
778                           our_nodeid, r->res_master_nodeid, dir_nodeid);
779                 dlm_print_rsb(r);
780                 r->res_master_nodeid = our_nodeid;
781                 r->res_nodeid = 0;
782         }
783
784         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786         goto out_unlock;
787
788
789  do_new:
790         /*
791          * rsb not found
792          */
793
794         error = get_rsb_struct(ls, name, len, &r);
795         if (error == -EAGAIN) {
796                 spin_unlock(&ls->ls_rsbtbl[b].lock);
797                 goto retry;
798         }
799         if (error)
800                 goto out_unlock;
801
802         r->res_hash = hash;
803         r->res_bucket = b;
804         r->res_dir_nodeid = dir_nodeid;
805         r->res_master_nodeid = dir_nodeid;
806         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807         kref_init(&r->res_ref);
808
809         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810  out_unlock:
811         spin_unlock(&ls->ls_rsbtbl[b].lock);
812  out:
813         *r_ret = r;
814         return error;
815 }
816
817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818                     unsigned int flags, struct dlm_rsb **r_ret)
819 {
820         uint32_t hash, b;
821         int dir_nodeid;
822
823         if (len > DLM_RESNAME_MAXLEN)
824                 return -EINVAL;
825
826         hash = jhash(name, len, 0);
827         b = hash & (ls->ls_rsbtbl_size - 1);
828
829         dir_nodeid = dlm_hash2nodeid(ls, hash);
830
831         if (dlm_no_directory(ls))
832                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833                                       from_nodeid, flags, r_ret);
834         else
835                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836                                       from_nodeid, flags, r_ret);
837 }
838
839 /* we have received a request and found that res_master_nodeid != our_nodeid,
840    so we need to return an error or make ourself the master */
841
842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843                                   int from_nodeid)
844 {
845         if (dlm_no_directory(ls)) {
846                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847                           from_nodeid, r->res_master_nodeid,
848                           r->res_dir_nodeid);
849                 dlm_print_rsb(r);
850                 return -ENOTBLK;
851         }
852
853         if (from_nodeid != r->res_dir_nodeid) {
854                 /* our rsb is not master, and another node (not the dir node)
855                    has sent us a request.  this is much more common when our
856                    master_nodeid is zero, so limit debug to non-zero.  */
857
858                 if (r->res_master_nodeid) {
859                         log_debug(ls, "validate master from_other %d master %d "
860                                   "dir %d first %x %s", from_nodeid,
861                                   r->res_master_nodeid, r->res_dir_nodeid,
862                                   r->res_first_lkid, r->res_name);
863                 }
864                 return -ENOTBLK;
865         } else {
866                 /* our rsb is not master, but the dir nodeid has sent us a
867                    request; this could happen with master 0 / res_nodeid -1 */
868
869                 if (r->res_master_nodeid) {
870                         log_error(ls, "validate master from_dir %d master %d "
871                                   "first %x %s",
872                                   from_nodeid, r->res_master_nodeid,
873                                   r->res_first_lkid, r->res_name);
874                 }
875
876                 r->res_master_nodeid = dlm_our_nodeid();
877                 r->res_nodeid = 0;
878                 return 0;
879         }
880 }
881
882 /*
883  * We're the dir node for this res and another node wants to know the
884  * master nodeid.  During normal operation (non recovery) this is only
885  * called from receive_lookup(); master lookups when the local node is
886  * the dir node are done by find_rsb().
887  *
888  * normal operation, we are the dir node for a resource
889  * . _request_lock
890  * . set_master
891  * . send_lookup
892  * . receive_lookup
893  * . dlm_master_lookup flags 0
894  *
895  * recover directory, we are rebuilding dir for all resources
896  * . dlm_recover_directory
897  * . dlm_rcom_names
898  *   remote node sends back the rsb names it is master of and we are dir of
899  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900  *   we either create new rsb setting remote node as master, or find existing
901  *   rsb and set master to be the remote node.
902  *
903  * recover masters, we are finding the new master for resources
904  * . dlm_recover_masters
905  * . recover_master
906  * . dlm_send_rcom_lookup
907  * . receive_rcom_lookup
908  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
909  */
910
911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
912                       unsigned int flags, int *r_nodeid, int *result)
913 {
914         struct dlm_rsb *r = NULL;
915         uint32_t hash, b;
916         int from_master = (flags & DLM_LU_RECOVER_DIR);
917         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918         int our_nodeid = dlm_our_nodeid();
919         int dir_nodeid, error, toss_list = 0;
920
921         if (len > DLM_RESNAME_MAXLEN)
922                 return -EINVAL;
923
924         if (from_nodeid == our_nodeid) {
925                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926                           our_nodeid, flags);
927                 return -EINVAL;
928         }
929
930         hash = jhash(name, len, 0);
931         b = hash & (ls->ls_rsbtbl_size - 1);
932
933         dir_nodeid = dlm_hash2nodeid(ls, hash);
934         if (dir_nodeid != our_nodeid) {
935                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936                           from_nodeid, dir_nodeid, our_nodeid, hash,
937                           ls->ls_num_nodes);
938                 *r_nodeid = -1;
939                 return -EINVAL;
940         }
941
942  retry:
943         error = pre_rsb_struct(ls);
944         if (error < 0)
945                 return error;
946
947         spin_lock(&ls->ls_rsbtbl[b].lock);
948         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949         if (!error) {
950                 /* because the rsb is active, we need to lock_rsb before
951                    checking/changing re_master_nodeid */
952
953                 hold_rsb(r);
954                 spin_unlock(&ls->ls_rsbtbl[b].lock);
955                 lock_rsb(r);
956                 goto found;
957         }
958
959         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960         if (error)
961                 goto not_found;
962
963         /* because the rsb is inactive (on toss list), it's not refcounted
964            and lock_rsb is not used, but is protected by the rsbtbl lock */
965
966         toss_list = 1;
967  found:
968         if (r->res_dir_nodeid != our_nodeid) {
969                 /* should not happen, but may as well fix it and carry on */
970                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971                           r->res_dir_nodeid, our_nodeid, r->res_name);
972                 r->res_dir_nodeid = our_nodeid;
973         }
974
975         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976                 /* Recovery uses this function to set a new master when
977                    the previous master failed.  Setting NEW_MASTER will
978                    force dlm_recover_masters to call recover_master on this
979                    rsb even though the res_nodeid is no longer removed. */
980
981                 r->res_master_nodeid = from_nodeid;
982                 r->res_nodeid = from_nodeid;
983                 rsb_set_flag(r, RSB_NEW_MASTER);
984
985                 if (toss_list) {
986                         /* I don't think we should ever find it on toss list. */
987                         log_error(ls, "dlm_master_lookup fix_master on toss");
988                         dlm_dump_rsb(r);
989                 }
990         }
991
992         if (from_master && (r->res_master_nodeid != from_nodeid)) {
993                 /* this will happen if from_nodeid became master during
994                    a previous recovery cycle, and we aborted the previous
995                    cycle before recovering this master value */
996
997                 log_limit(ls, "dlm_master_lookup from_master %d "
998                           "master_nodeid %d res_nodeid %d first %x %s",
999                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000                           r->res_first_lkid, r->res_name);
1001
1002                 if (r->res_master_nodeid == our_nodeid) {
1003                         log_error(ls, "from_master %d our_master", from_nodeid);
1004                         dlm_dump_rsb(r);
1005                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1006                         goto out_found;
1007                 }
1008
1009                 r->res_master_nodeid = from_nodeid;
1010                 r->res_nodeid = from_nodeid;
1011                 rsb_set_flag(r, RSB_NEW_MASTER);
1012         }
1013
1014         if (!r->res_master_nodeid) {
1015                 /* this will happen if recovery happens while we're looking
1016                    up the master for this rsb */
1017
1018                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019                           from_nodeid, r->res_first_lkid, r->res_name);
1020                 r->res_master_nodeid = from_nodeid;
1021                 r->res_nodeid = from_nodeid;
1022         }
1023
1024         if (!from_master && !fix_master &&
1025             (r->res_master_nodeid == from_nodeid)) {
1026                 /* this can happen when the master sends remove, the dir node
1027                    finds the rsb on the keep list and ignores the remove,
1028                    and the former master sends a lookup */
1029
1030                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031                           "first %x %s", from_nodeid, flags,
1032                           r->res_first_lkid, r->res_name);
1033         }
1034
1035  out_found:
1036         *r_nodeid = r->res_master_nodeid;
1037         if (result)
1038                 *result = DLM_LU_MATCH;
1039
1040         if (toss_list) {
1041                 r->res_toss_time = jiffies;
1042                 /* the rsb was inactive (on toss list) */
1043                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1044         } else {
1045                 /* the rsb was active */
1046                 unlock_rsb(r);
1047                 put_rsb(r);
1048         }
1049         return 0;
1050
1051  not_found:
1052         error = get_rsb_struct(ls, name, len, &r);
1053         if (error == -EAGAIN) {
1054                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1055                 goto retry;
1056         }
1057         if (error)
1058                 goto out_unlock;
1059
1060         r->res_hash = hash;
1061         r->res_bucket = b;
1062         r->res_dir_nodeid = our_nodeid;
1063         r->res_master_nodeid = from_nodeid;
1064         r->res_nodeid = from_nodeid;
1065         kref_init(&r->res_ref);
1066         r->res_toss_time = jiffies;
1067
1068         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069         if (error) {
1070                 /* should never happen */
1071                 dlm_free_rsb(r);
1072                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1073                 goto retry;
1074         }
1075
1076         if (result)
1077                 *result = DLM_LU_ADD;
1078         *r_nodeid = from_nodeid;
1079         error = 0;
1080  out_unlock:
1081         spin_unlock(&ls->ls_rsbtbl[b].lock);
1082         return error;
1083 }
1084
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086 {
1087         struct rb_node *n;
1088         struct dlm_rsb *r;
1089         int i;
1090
1091         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092                 spin_lock(&ls->ls_rsbtbl[i].lock);
1093                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095                         if (r->res_hash == hash)
1096                                 dlm_dump_rsb(r);
1097                 }
1098                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1099         }
1100 }
1101
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103 {
1104         struct dlm_rsb *r = NULL;
1105         uint32_t hash, b;
1106         int error;
1107
1108         hash = jhash(name, len, 0);
1109         b = hash & (ls->ls_rsbtbl_size - 1);
1110
1111         spin_lock(&ls->ls_rsbtbl[b].lock);
1112         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113         if (!error)
1114                 goto out_dump;
1115
1116         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117         if (error)
1118                 goto out;
1119  out_dump:
1120         dlm_dump_rsb(r);
1121  out:
1122         spin_unlock(&ls->ls_rsbtbl[b].lock);
1123 }
1124
1125 static void toss_rsb(struct kref *kref)
1126 {
1127         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128         struct dlm_ls *ls = r->res_ls;
1129
1130         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131         kref_init(&r->res_ref);
1132         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134         r->res_toss_time = jiffies;
1135         if (r->res_lvbptr) {
1136                 dlm_free_lvb(r->res_lvbptr);
1137                 r->res_lvbptr = NULL;
1138         }
1139 }
1140
1141 /* See comment for unhold_lkb */
1142
1143 static void unhold_rsb(struct dlm_rsb *r)
1144 {
1145         int rv;
1146         rv = kref_put(&r->res_ref, toss_rsb);
1147         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1148 }
1149
1150 static void kill_rsb(struct kref *kref)
1151 {
1152         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1153
1154         /* All work is done after the return from kref_put() so we
1155            can release the write_lock before the remove and free. */
1156
1157         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1163 }
1164
1165 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166    The rsb must exist as long as any lkb's for it do. */
1167
1168 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1169 {
1170         hold_rsb(r);
1171         lkb->lkb_resource = r;
1172 }
1173
1174 static void detach_lkb(struct dlm_lkb *lkb)
1175 {
1176         if (lkb->lkb_resource) {
1177                 put_rsb(lkb->lkb_resource);
1178                 lkb->lkb_resource = NULL;
1179         }
1180 }
1181
1182 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1183 {
1184         struct dlm_lkb *lkb;
1185         int rv, id;
1186
1187         lkb = dlm_allocate_lkb(ls);
1188         if (!lkb)
1189                 return -ENOMEM;
1190
1191         lkb->lkb_nodeid = -1;
1192         lkb->lkb_grmode = DLM_LOCK_IV;
1193         kref_init(&lkb->lkb_ref);
1194         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196         INIT_LIST_HEAD(&lkb->lkb_time_list);
1197         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198         mutex_init(&lkb->lkb_cb_mutex);
1199         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1200
1201  retry:
1202         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1203         if (!rv)
1204                 return -ENOMEM;
1205
1206         spin_lock(&ls->ls_lkbidr_spin);
1207         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1208         if (!rv)
1209                 lkb->lkb_id = id;
1210         spin_unlock(&ls->ls_lkbidr_spin);
1211
1212         if (rv == -EAGAIN)
1213                 goto retry;
1214
1215         if (rv < 0) {
1216                 log_error(ls, "create_lkb idr error %d", rv);
1217                 return rv;
1218         }
1219
1220         *lkb_ret = lkb;
1221         return 0;
1222 }
1223
1224 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1225 {
1226         struct dlm_lkb *lkb;
1227
1228         spin_lock(&ls->ls_lkbidr_spin);
1229         lkb = idr_find(&ls->ls_lkbidr, lkid);
1230         if (lkb)
1231                 kref_get(&lkb->lkb_ref);
1232         spin_unlock(&ls->ls_lkbidr_spin);
1233
1234         *lkb_ret = lkb;
1235         return lkb ? 0 : -ENOENT;
1236 }
1237
1238 static void kill_lkb(struct kref *kref)
1239 {
1240         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1241
1242         /* All work is done after the return from kref_put() so we
1243            can release the write_lock before the detach_lkb */
1244
1245         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1246 }
1247
1248 /* __put_lkb() is used when an lkb may not have an rsb attached to
1249    it so we need to provide the lockspace explicitly */
1250
1251 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1252 {
1253         uint32_t lkid = lkb->lkb_id;
1254
1255         spin_lock(&ls->ls_lkbidr_spin);
1256         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257                 idr_remove(&ls->ls_lkbidr, lkid);
1258                 spin_unlock(&ls->ls_lkbidr_spin);
1259
1260                 detach_lkb(lkb);
1261
1262                 /* for local/process lkbs, lvbptr points to caller's lksb */
1263                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264                         dlm_free_lvb(lkb->lkb_lvbptr);
1265                 dlm_free_lkb(lkb);
1266                 return 1;
1267         } else {
1268                 spin_unlock(&ls->ls_lkbidr_spin);
1269                 return 0;
1270         }
1271 }
1272
1273 int dlm_put_lkb(struct dlm_lkb *lkb)
1274 {
1275         struct dlm_ls *ls;
1276
1277         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1279
1280         ls = lkb->lkb_resource->res_ls;
1281         return __put_lkb(ls, lkb);
1282 }
1283
1284 /* This is only called to add a reference when the code already holds
1285    a valid reference to the lkb, so there's no need for locking. */
1286
1287 static inline void hold_lkb(struct dlm_lkb *lkb)
1288 {
1289         kref_get(&lkb->lkb_ref);
1290 }
1291
1292 /* This is called when we need to remove a reference and are certain
1293    it's not the last ref.  e.g. del_lkb is always called between a
1294    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295    put_lkb would work fine, but would involve unnecessary locking */
1296
1297 static inline void unhold_lkb(struct dlm_lkb *lkb)
1298 {
1299         int rv;
1300         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1302 }
1303
1304 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1305                             int mode)
1306 {
1307         struct dlm_lkb *lkb = NULL;
1308
1309         list_for_each_entry(lkb, head, lkb_statequeue)
1310                 if (lkb->lkb_rqmode < mode)
1311                         break;
1312
1313         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1314 }
1315
1316 /* add/remove lkb to rsb's grant/convert/wait queue */
1317
1318 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1319 {
1320         kref_get(&lkb->lkb_ref);
1321
1322         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1323
1324         lkb->lkb_timestamp = ktime_get();
1325
1326         lkb->lkb_status = status;
1327
1328         switch (status) {
1329         case DLM_LKSTS_WAITING:
1330                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1332                 else
1333                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1334                 break;
1335         case DLM_LKSTS_GRANTED:
1336                 /* convention says granted locks kept in order of grmode */
1337                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1338                                 lkb->lkb_grmode);
1339                 break;
1340         case DLM_LKSTS_CONVERT:
1341                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1343                 else
1344                         list_add_tail(&lkb->lkb_statequeue,
1345                                       &r->res_convertqueue);
1346                 break;
1347         default:
1348                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1349         }
1350 }
1351
1352 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353 {
1354         lkb->lkb_status = 0;
1355         list_del(&lkb->lkb_statequeue);
1356         unhold_lkb(lkb);
1357 }
1358
1359 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1360 {
1361         hold_lkb(lkb);
1362         del_lkb(r, lkb);
1363         add_lkb(r, lkb, sts);
1364         unhold_lkb(lkb);
1365 }
1366
1367 static int msg_reply_type(int mstype)
1368 {
1369         switch (mstype) {
1370         case DLM_MSG_REQUEST:
1371                 return DLM_MSG_REQUEST_REPLY;
1372         case DLM_MSG_CONVERT:
1373                 return DLM_MSG_CONVERT_REPLY;
1374         case DLM_MSG_UNLOCK:
1375                 return DLM_MSG_UNLOCK_REPLY;
1376         case DLM_MSG_CANCEL:
1377                 return DLM_MSG_CANCEL_REPLY;
1378         case DLM_MSG_LOOKUP:
1379                 return DLM_MSG_LOOKUP_REPLY;
1380         }
1381         return -1;
1382 }
1383
1384 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1385 {
1386         int i;
1387
1388         for (i = 0; i < num_nodes; i++) {
1389                 if (!warned[i]) {
1390                         warned[i] = nodeid;
1391                         return 0;
1392                 }
1393                 if (warned[i] == nodeid)
1394                         return 1;
1395         }
1396         return 0;
1397 }
1398
1399 void dlm_scan_waiters(struct dlm_ls *ls)
1400 {
1401         struct dlm_lkb *lkb;
1402         ktime_t zero = ktime_set(0, 0);
1403         s64 us;
1404         s64 debug_maxus = 0;
1405         u32 debug_scanned = 0;
1406         u32 debug_expired = 0;
1407         int num_nodes = 0;
1408         int *warned = NULL;
1409
1410         if (!dlm_config.ci_waitwarn_us)
1411                 return;
1412
1413         mutex_lock(&ls->ls_waiters_mutex);
1414
1415         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1416                 if (ktime_equal(lkb->lkb_wait_time, zero))
1417                         continue;
1418
1419                 debug_scanned++;
1420
1421                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1422
1423                 if (us < dlm_config.ci_waitwarn_us)
1424                         continue;
1425
1426                 lkb->lkb_wait_time = zero;
1427
1428                 debug_expired++;
1429                 if (us > debug_maxus)
1430                         debug_maxus = us;
1431
1432                 if (!num_nodes) {
1433                         num_nodes = ls->ls_num_nodes;
1434                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1435                 }
1436                 if (!warned)
1437                         continue;
1438                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1439                         continue;
1440
1441                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1442                           "node %d", lkb->lkb_id, (long long)us,
1443                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1444         }
1445         mutex_unlock(&ls->ls_waiters_mutex);
1446         kfree(warned);
1447
1448         if (debug_expired)
1449                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450                           debug_scanned, debug_expired,
1451                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1452 }
1453
1454 /* add/remove lkb from global waiters list of lkb's waiting for
1455    a reply from a remote node */
1456
1457 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1458 {
1459         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1460         int error = 0;
1461
1462         mutex_lock(&ls->ls_waiters_mutex);
1463
1464         if (is_overlap_unlock(lkb) ||
1465             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1466                 error = -EINVAL;
1467                 goto out;
1468         }
1469
1470         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1471                 switch (mstype) {
1472                 case DLM_MSG_UNLOCK:
1473                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1474                         break;
1475                 case DLM_MSG_CANCEL:
1476                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1477                         break;
1478                 default:
1479                         error = -EBUSY;
1480                         goto out;
1481                 }
1482                 lkb->lkb_wait_count++;
1483                 hold_lkb(lkb);
1484
1485                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487                           lkb->lkb_wait_count, lkb->lkb_flags);
1488                 goto out;
1489         }
1490
1491         DLM_ASSERT(!lkb->lkb_wait_count,
1492                    dlm_print_lkb(lkb);
1493                    printk("wait_count %d\n", lkb->lkb_wait_count););
1494
1495         lkb->lkb_wait_count++;
1496         lkb->lkb_wait_type = mstype;
1497         lkb->lkb_wait_time = ktime_get();
1498         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1499         hold_lkb(lkb);
1500         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1501  out:
1502         if (error)
1503                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1506         mutex_unlock(&ls->ls_waiters_mutex);
1507         return error;
1508 }
1509
1510 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1511    list as part of process_requestqueue (e.g. a lookup that has an optimized
1512    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513    set RESEND and dlm_recover_waiters_post() */
1514
1515 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516                                 struct dlm_message *ms)
1517 {
1518         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519         int overlap_done = 0;
1520
1521         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1523                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1524                 overlap_done = 1;
1525                 goto out_del;
1526         }
1527
1528         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1530                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1531                 overlap_done = 1;
1532                 goto out_del;
1533         }
1534
1535         /* Cancel state was preemptively cleared by a successful convert,
1536            see next comment, nothing to do. */
1537
1538         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541                           lkb->lkb_id, lkb->lkb_wait_type);
1542                 return -1;
1543         }
1544
1545         /* Remove for the convert reply, and premptively remove for the
1546            cancel reply.  A convert has been granted while there's still
1547            an outstanding cancel on it (the cancel is moot and the result
1548            in the cancel reply should be 0).  We preempt the cancel reply
1549            because the app gets the convert result and then can follow up
1550            with another op, like convert.  This subsequent op would see the
1551            lingering state of the cancel and fail with -EBUSY. */
1552
1553         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1557                           lkb->lkb_id);
1558                 lkb->lkb_wait_type = 0;
1559                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1560                 lkb->lkb_wait_count--;
1561                 goto out_del;
1562         }
1563
1564         /* N.B. type of reply may not always correspond to type of original
1565            msg due to lookup->request optimization, verify others? */
1566
1567         if (lkb->lkb_wait_type) {
1568                 lkb->lkb_wait_type = 0;
1569                 goto out_del;
1570         }
1571
1572         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574                   mstype, lkb->lkb_flags);
1575         return -1;
1576
1577  out_del:
1578         /* the force-unlock/cancel has completed and we haven't recvd a reply
1579            to the op that was in progress prior to the unlock/cancel; we
1580            give up on any reply to the earlier op.  FIXME: not sure when/how
1581            this would happen */
1582
1583         if (overlap_done && lkb->lkb_wait_type) {
1584                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586                 lkb->lkb_wait_count--;
1587                 lkb->lkb_wait_type = 0;
1588         }
1589
1590         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1591
1592         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593         lkb->lkb_wait_count--;
1594         if (!lkb->lkb_wait_count)
1595                 list_del_init(&lkb->lkb_wait_reply);
1596         unhold_lkb(lkb);
1597         return 0;
1598 }
1599
1600 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1601 {
1602         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1603         int error;
1604
1605         mutex_lock(&ls->ls_waiters_mutex);
1606         error = _remove_from_waiters(lkb, mstype, NULL);
1607         mutex_unlock(&ls->ls_waiters_mutex);
1608         return error;
1609 }
1610
1611 /* Handles situations where we might be processing a "fake" or "stub" reply in
1612    which we can't try to take waiters_mutex again. */
1613
1614 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1615 {
1616         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1617         int error;
1618
1619         if (ms->m_flags != DLM_IFL_STUB_MS)
1620                 mutex_lock(&ls->ls_waiters_mutex);
1621         error = _remove_from_waiters(lkb, ms->m_type, ms);
1622         if (ms->m_flags != DLM_IFL_STUB_MS)
1623                 mutex_unlock(&ls->ls_waiters_mutex);
1624         return error;
1625 }
1626
1627 /* If there's an rsb for the same resource being removed, ensure
1628    that the remove message is sent before the new lookup message.
1629    It should be rare to need a delay here, but if not, then it may
1630    be worthwhile to add a proper wait mechanism rather than a delay. */
1631
1632 static void wait_pending_remove(struct dlm_rsb *r)
1633 {
1634         struct dlm_ls *ls = r->res_ls;
1635  restart:
1636         spin_lock(&ls->ls_remove_spin);
1637         if (ls->ls_remove_len &&
1638             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639                 log_debug(ls, "delay lookup for remove dir %d %s",
1640                           r->res_dir_nodeid, r->res_name);
1641                 spin_unlock(&ls->ls_remove_spin);
1642                 msleep(1);
1643                 goto restart;
1644         }
1645         spin_unlock(&ls->ls_remove_spin);
1646 }
1647
1648 /*
1649  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650  * read by other threads in wait_pending_remove.  ls_remove_names
1651  * and ls_remove_lens are only used by the scan thread, so they do
1652  * not need protection.
1653  */
1654
1655 static void shrink_bucket(struct dlm_ls *ls, int b)
1656 {
1657         struct rb_node *n, *next;
1658         struct dlm_rsb *r;
1659         char *name;
1660         int our_nodeid = dlm_our_nodeid();
1661         int remote_count = 0;
1662         int i, len, rv;
1663
1664         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1665
1666         spin_lock(&ls->ls_rsbtbl[b].lock);
1667         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1668                 next = rb_next(n);
1669                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1670
1671                 /* If we're the directory record for this rsb, and
1672                    we're not the master of it, then we need to wait
1673                    for the master node to send us a dir remove for
1674                    before removing the dir record. */
1675
1676                 if (!dlm_no_directory(ls) &&
1677                     (r->res_master_nodeid != our_nodeid) &&
1678                     (dlm_dir_nodeid(r) == our_nodeid)) {
1679                         continue;
1680                 }
1681
1682                 if (!time_after_eq(jiffies, r->res_toss_time +
1683                                    dlm_config.ci_toss_secs * HZ)) {
1684                         continue;
1685                 }
1686
1687                 if (!dlm_no_directory(ls) &&
1688                     (r->res_master_nodeid == our_nodeid) &&
1689                     (dlm_dir_nodeid(r) != our_nodeid)) {
1690
1691                         /* We're the master of this rsb but we're not
1692                            the directory record, so we need to tell the
1693                            dir node to remove the dir record. */
1694
1695                         ls->ls_remove_lens[remote_count] = r->res_length;
1696                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1697                                DLM_RESNAME_MAXLEN);
1698                         remote_count++;
1699
1700                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1701                                 break;
1702                         continue;
1703                 }
1704
1705                 if (!kref_put(&r->res_ref, kill_rsb)) {
1706                         log_error(ls, "tossed rsb in use %s", r->res_name);
1707                         continue;
1708                 }
1709
1710                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711                 dlm_free_rsb(r);
1712         }
1713         spin_unlock(&ls->ls_rsbtbl[b].lock);
1714
1715         /*
1716          * While searching for rsb's to free, we found some that require
1717          * remote removal.  We leave them in place and find them again here
1718          * so there is a very small gap between removing them from the toss
1719          * list and sending the removal.  Keeping this gap small is
1720          * important to keep us (the master node) from being out of sync
1721          * with the remote dir node for very long.
1722          *
1723          * From the time the rsb is removed from toss until just after
1724          * send_remove, the rsb name is saved in ls_remove_name.  A new
1725          * lookup checks this to ensure that a new lookup message for the
1726          * same resource name is not sent just before the remove message.
1727          */
1728
1729         for (i = 0; i < remote_count; i++) {
1730                 name = ls->ls_remove_names[i];
1731                 len = ls->ls_remove_lens[i];
1732
1733                 spin_lock(&ls->ls_rsbtbl[b].lock);
1734                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1735                 if (rv) {
1736                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1737                         log_debug(ls, "remove_name not toss %s", name);
1738                         continue;
1739                 }
1740
1741                 if (r->res_master_nodeid != our_nodeid) {
1742                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1743                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1744                                   r->res_master_nodeid, r->res_dir_nodeid,
1745                                   our_nodeid, name);
1746                         continue;
1747                 }
1748
1749                 if (r->res_dir_nodeid == our_nodeid) {
1750                         /* should never happen */
1751                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1752                         log_error(ls, "remove_name dir %d master %d our %d %s",
1753                                   r->res_dir_nodeid, r->res_master_nodeid,
1754                                   our_nodeid, name);
1755                         continue;
1756                 }
1757
1758                 if (!time_after_eq(jiffies, r->res_toss_time +
1759                                    dlm_config.ci_toss_secs * HZ)) {
1760                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1761                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762                                   r->res_toss_time, jiffies, name);
1763                         continue;
1764                 }
1765
1766                 if (!kref_put(&r->res_ref, kill_rsb)) {
1767                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1768                         log_error(ls, "remove_name in use %s", name);
1769                         continue;
1770                 }
1771
1772                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773
1774                 /* block lookup of same name until we've sent remove */
1775                 spin_lock(&ls->ls_remove_spin);
1776                 ls->ls_remove_len = len;
1777                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778                 spin_unlock(&ls->ls_remove_spin);
1779                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1780
1781                 send_remove(r);
1782
1783                 /* allow lookup of name again */
1784                 spin_lock(&ls->ls_remove_spin);
1785                 ls->ls_remove_len = 0;
1786                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787                 spin_unlock(&ls->ls_remove_spin);
1788
1789                 dlm_free_rsb(r);
1790         }
1791 }
1792
1793 void dlm_scan_rsbs(struct dlm_ls *ls)
1794 {
1795         int i;
1796
1797         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1798                 shrink_bucket(ls, i);
1799                 if (dlm_locking_stopped(ls))
1800                         break;
1801                 cond_resched();
1802         }
1803 }
1804
1805 static void add_timeout(struct dlm_lkb *lkb)
1806 {
1807         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1808
1809         if (is_master_copy(lkb))
1810                 return;
1811
1812         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1813             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1814                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1815                 goto add_it;
1816         }
1817         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1818                 goto add_it;
1819         return;
1820
1821  add_it:
1822         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1823         mutex_lock(&ls->ls_timeout_mutex);
1824         hold_lkb(lkb);
1825         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1826         mutex_unlock(&ls->ls_timeout_mutex);
1827 }
1828
1829 static void del_timeout(struct dlm_lkb *lkb)
1830 {
1831         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1832
1833         mutex_lock(&ls->ls_timeout_mutex);
1834         if (!list_empty(&lkb->lkb_time_list)) {
1835                 list_del_init(&lkb->lkb_time_list);
1836                 unhold_lkb(lkb);
1837         }
1838         mutex_unlock(&ls->ls_timeout_mutex);
1839 }
1840
1841 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1842    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1843    and then lock rsb because of lock ordering in add_timeout.  We may need
1844    to specify some special timeout-related bits in the lkb that are just to
1845    be accessed under the timeout_mutex. */
1846
1847 void dlm_scan_timeout(struct dlm_ls *ls)
1848 {
1849         struct dlm_rsb *r;
1850         struct dlm_lkb *lkb;
1851         int do_cancel, do_warn;
1852         s64 wait_us;
1853
1854         for (;;) {
1855                 if (dlm_locking_stopped(ls))
1856                         break;
1857
1858                 do_cancel = 0;
1859                 do_warn = 0;
1860                 mutex_lock(&ls->ls_timeout_mutex);
1861                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1862
1863                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1864                                                         lkb->lkb_timestamp));
1865
1866                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1867                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1868                                 do_cancel = 1;
1869
1870                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1871                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1872                                 do_warn = 1;
1873
1874                         if (!do_cancel && !do_warn)
1875                                 continue;
1876                         hold_lkb(lkb);
1877                         break;
1878                 }
1879                 mutex_unlock(&ls->ls_timeout_mutex);
1880
1881                 if (!do_cancel && !do_warn)
1882                         break;
1883
1884                 r = lkb->lkb_resource;
1885                 hold_rsb(r);
1886                 lock_rsb(r);
1887
1888                 if (do_warn) {
1889                         /* clear flag so we only warn once */
1890                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1891                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1892                                 del_timeout(lkb);
1893                         dlm_timeout_warn(lkb);
1894                 }
1895
1896                 if (do_cancel) {
1897                         log_debug(ls, "timeout cancel %x node %d %s",
1898                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1899                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1900                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1901                         del_timeout(lkb);
1902                         _cancel_lock(r, lkb);
1903                 }
1904
1905                 unlock_rsb(r);
1906                 unhold_rsb(r);
1907                 dlm_put_lkb(lkb);
1908         }
1909 }
1910
1911 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1912    dlm_recoverd before checking/setting ls_recover_begin. */
1913
1914 void dlm_adjust_timeouts(struct dlm_ls *ls)
1915 {
1916         struct dlm_lkb *lkb;
1917         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1918
1919         ls->ls_recover_begin = 0;
1920         mutex_lock(&ls->ls_timeout_mutex);
1921         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1922                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1923         mutex_unlock(&ls->ls_timeout_mutex);
1924
1925         if (!dlm_config.ci_waitwarn_us)
1926                 return;
1927
1928         mutex_lock(&ls->ls_waiters_mutex);
1929         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1930                 if (ktime_to_us(lkb->lkb_wait_time))
1931                         lkb->lkb_wait_time = ktime_get();
1932         }
1933         mutex_unlock(&ls->ls_waiters_mutex);
1934 }
1935
1936 /* lkb is master or local copy */
1937
1938 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1939 {
1940         int b, len = r->res_ls->ls_lvblen;
1941
1942         /* b=1 lvb returned to caller
1943            b=0 lvb written to rsb or invalidated
1944            b=-1 do nothing */
1945
1946         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1947
1948         if (b == 1) {
1949                 if (!lkb->lkb_lvbptr)
1950                         return;
1951
1952                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1953                         return;
1954
1955                 if (!r->res_lvbptr)
1956                         return;
1957
1958                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1959                 lkb->lkb_lvbseq = r->res_lvbseq;
1960
1961         } else if (b == 0) {
1962                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1963                         rsb_set_flag(r, RSB_VALNOTVALID);
1964                         return;
1965                 }
1966
1967                 if (!lkb->lkb_lvbptr)
1968                         return;
1969
1970                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1971                         return;
1972
1973                 if (!r->res_lvbptr)
1974                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1975
1976                 if (!r->res_lvbptr)
1977                         return;
1978
1979                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1980                 r->res_lvbseq++;
1981                 lkb->lkb_lvbseq = r->res_lvbseq;
1982                 rsb_clear_flag(r, RSB_VALNOTVALID);
1983         }
1984
1985         if (rsb_flag(r, RSB_VALNOTVALID))
1986                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1987 }
1988
1989 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1990 {
1991         if (lkb->lkb_grmode < DLM_LOCK_PW)
1992                 return;
1993
1994         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1995                 rsb_set_flag(r, RSB_VALNOTVALID);
1996                 return;
1997         }
1998
1999         if (!lkb->lkb_lvbptr)
2000                 return;
2001
2002         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2003                 return;
2004
2005         if (!r->res_lvbptr)
2006                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2007
2008         if (!r->res_lvbptr)
2009                 return;
2010
2011         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2012         r->res_lvbseq++;
2013         rsb_clear_flag(r, RSB_VALNOTVALID);
2014 }
2015
2016 /* lkb is process copy (pc) */
2017
2018 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2019                             struct dlm_message *ms)
2020 {
2021         int b;
2022
2023         if (!lkb->lkb_lvbptr)
2024                 return;
2025
2026         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2027                 return;
2028
2029         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2030         if (b == 1) {
2031                 int len = receive_extralen(ms);
2032                 if (len > DLM_RESNAME_MAXLEN)
2033                         len = DLM_RESNAME_MAXLEN;
2034                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2035                 lkb->lkb_lvbseq = ms->m_lvbseq;
2036         }
2037 }
2038
2039 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2040    remove_lock -- used for unlock, removes lkb from granted
2041    revert_lock -- used for cancel, moves lkb from convert to granted
2042    grant_lock  -- used for request and convert, adds lkb to granted or
2043                   moves lkb from convert or waiting to granted
2044
2045    Each of these is used for master or local copy lkb's.  There is
2046    also a _pc() variation used to make the corresponding change on
2047    a process copy (pc) lkb. */
2048
2049 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051         del_lkb(r, lkb);
2052         lkb->lkb_grmode = DLM_LOCK_IV;
2053         /* this unhold undoes the original ref from create_lkb()
2054            so this leads to the lkb being freed */
2055         unhold_lkb(lkb);
2056 }
2057
2058 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060         set_lvb_unlock(r, lkb);
2061         _remove_lock(r, lkb);
2062 }
2063
2064 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2065 {
2066         _remove_lock(r, lkb);
2067 }
2068
2069 /* returns: 0 did nothing
2070             1 moved lock to granted
2071            -1 removed lock */
2072
2073 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2074 {
2075         int rv = 0;
2076
2077         lkb->lkb_rqmode = DLM_LOCK_IV;
2078
2079         switch (lkb->lkb_status) {
2080         case DLM_LKSTS_GRANTED:
2081                 break;
2082         case DLM_LKSTS_CONVERT:
2083                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2084                 rv = 1;
2085                 break;
2086         case DLM_LKSTS_WAITING:
2087                 del_lkb(r, lkb);
2088                 lkb->lkb_grmode = DLM_LOCK_IV;
2089                 /* this unhold undoes the original ref from create_lkb()
2090                    so this leads to the lkb being freed */
2091                 unhold_lkb(lkb);
2092                 rv = -1;
2093                 break;
2094         default:
2095                 log_print("invalid status for revert %d", lkb->lkb_status);
2096         }
2097         return rv;
2098 }
2099
2100 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101 {
2102         return revert_lock(r, lkb);
2103 }
2104
2105 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 {
2107         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2108                 lkb->lkb_grmode = lkb->lkb_rqmode;
2109                 if (lkb->lkb_status)
2110                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2111                 else
2112                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2113         }
2114
2115         lkb->lkb_rqmode = DLM_LOCK_IV;
2116         lkb->lkb_highbast = 0;
2117 }
2118
2119 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2120 {
2121         set_lvb_lock(r, lkb);
2122         _grant_lock(r, lkb);
2123 }
2124
2125 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126                           struct dlm_message *ms)
2127 {
2128         set_lvb_lock_pc(r, lkb, ms);
2129         _grant_lock(r, lkb);
2130 }
2131
2132 /* called by grant_pending_locks() which means an async grant message must
2133    be sent to the requesting node in addition to granting the lock if the
2134    lkb belongs to a remote node. */
2135
2136 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2137 {
2138         grant_lock(r, lkb);
2139         if (is_master_copy(lkb))
2140                 send_grant(r, lkb);
2141         else
2142                 queue_cast(r, lkb, 0);
2143 }
2144
2145 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2146    change the granted/requested modes.  We're munging things accordingly in
2147    the process copy.
2148    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2149    conversion deadlock
2150    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2151    compatible with other granted locks */
2152
2153 static void munge_demoted(struct dlm_lkb *lkb)
2154 {
2155         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2156                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2157                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2158                 return;
2159         }
2160
2161         lkb->lkb_grmode = DLM_LOCK_NL;
2162 }
2163
2164 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2165 {
2166         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2167             ms->m_type != DLM_MSG_GRANT) {
2168                 log_print("munge_altmode %x invalid reply type %d",
2169                           lkb->lkb_id, ms->m_type);
2170                 return;
2171         }
2172
2173         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2174                 lkb->lkb_rqmode = DLM_LOCK_PR;
2175         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2176                 lkb->lkb_rqmode = DLM_LOCK_CW;
2177         else {
2178                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2179                 dlm_print_lkb(lkb);
2180         }
2181 }
2182
2183 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2184 {
2185         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2186                                            lkb_statequeue);
2187         if (lkb->lkb_id == first->lkb_id)
2188                 return 1;
2189
2190         return 0;
2191 }
2192
2193 /* Check if the given lkb conflicts with another lkb on the queue. */
2194
2195 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2196 {
2197         struct dlm_lkb *this;
2198
2199         list_for_each_entry(this, head, lkb_statequeue) {
2200                 if (this == lkb)
2201                         continue;
2202                 if (!modes_compat(this, lkb))
2203                         return 1;
2204         }
2205         return 0;
2206 }
2207
2208 /*
2209  * "A conversion deadlock arises with a pair of lock requests in the converting
2210  * queue for one resource.  The granted mode of each lock blocks the requested
2211  * mode of the other lock."
2212  *
2213  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2214  * convert queue from being granted, then deadlk/demote lkb.
2215  *
2216  * Example:
2217  * Granted Queue: empty
2218  * Convert Queue: NL->EX (first lock)
2219  *                PR->EX (second lock)
2220  *
2221  * The first lock can't be granted because of the granted mode of the second
2222  * lock and the second lock can't be granted because it's not first in the
2223  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2224  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2225  * flag set and return DEMOTED in the lksb flags.
2226  *
2227  * Originally, this function detected conv-deadlk in a more limited scope:
2228  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2229  * - if lkb1 was the first entry in the queue (not just earlier), and was
2230  *   blocked by the granted mode of lkb2, and there was nothing on the
2231  *   granted queue preventing lkb1 from being granted immediately, i.e.
2232  *   lkb2 was the only thing preventing lkb1 from being granted.
2233  *
2234  * That second condition meant we'd only say there was conv-deadlk if
2235  * resolving it (by demotion) would lead to the first lock on the convert
2236  * queue being granted right away.  It allowed conversion deadlocks to exist
2237  * between locks on the convert queue while they couldn't be granted anyway.
2238  *
2239  * Now, we detect and take action on conversion deadlocks immediately when
2240  * they're created, even if they may not be immediately consequential.  If
2241  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2242  * mode that would prevent lkb1's conversion from being granted, we do a
2243  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2244  * I think this means that the lkb_is_ahead condition below should always
2245  * be zero, i.e. there will never be conv-deadlk between two locks that are
2246  * both already on the convert queue.
2247  */
2248
2249 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2250 {
2251         struct dlm_lkb *lkb1;
2252         int lkb_is_ahead = 0;
2253
2254         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2255                 if (lkb1 == lkb2) {
2256                         lkb_is_ahead = 1;
2257                         continue;
2258                 }
2259
2260                 if (!lkb_is_ahead) {
2261                         if (!modes_compat(lkb2, lkb1))
2262                                 return 1;
2263                 } else {
2264                         if (!modes_compat(lkb2, lkb1) &&
2265                             !modes_compat(lkb1, lkb2))
2266                                 return 1;
2267                 }
2268         }
2269         return 0;
2270 }
2271
2272 /*
2273  * Return 1 if the lock can be granted, 0 otherwise.
2274  * Also detect and resolve conversion deadlocks.
2275  *
2276  * lkb is the lock to be granted
2277  *
2278  * now is 1 if the function is being called in the context of the
2279  * immediate request, it is 0 if called later, after the lock has been
2280  * queued.
2281  *
2282  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2283  */
2284
2285 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
2286 {
2287         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2288
2289         /*
2290          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2291          * a new request for a NL mode lock being blocked.
2292          *
2293          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2294          * request, then it would be granted.  In essence, the use of this flag
2295          * tells the Lock Manager to expedite theis request by not considering
2296          * what may be in the CONVERTING or WAITING queues...  As of this
2297          * writing, the EXPEDITE flag can be used only with new requests for NL
2298          * mode locks.  This flag is not valid for conversion requests.
2299          *
2300          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2301          * conversion or used with a non-NL requested mode.  We also know an
2302          * EXPEDITE request is always granted immediately, so now must always
2303          * be 1.  The full condition to grant an expedite request: (now &&
2304          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2305          * therefore be shortened to just checking the flag.
2306          */
2307
2308         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2309                 return 1;
2310
2311         /*
2312          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2313          * added to the remaining conditions.
2314          */
2315
2316         if (queue_conflict(&r->res_grantqueue, lkb))
2317                 goto out;
2318
2319         /*
2320          * 6-3: By default, a conversion request is immediately granted if the
2321          * requested mode is compatible with the modes of all other granted
2322          * locks
2323          */
2324
2325         if (queue_conflict(&r->res_convertqueue, lkb))
2326                 goto out;
2327
2328         /*
2329          * 6-5: But the default algorithm for deciding whether to grant or
2330          * queue conversion requests does not by itself guarantee that such
2331          * requests are serviced on a "first come first serve" basis.  This, in
2332          * turn, can lead to a phenomenon known as "indefinate postponement".
2333          *
2334          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2335          * the system service employed to request a lock conversion.  This flag
2336          * forces certain conversion requests to be queued, even if they are
2337          * compatible with the granted modes of other locks on the same
2338          * resource.  Thus, the use of this flag results in conversion requests
2339          * being ordered on a "first come first servce" basis.
2340          *
2341          * DCT: This condition is all about new conversions being able to occur
2342          * "in place" while the lock remains on the granted queue (assuming
2343          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2344          * doesn't _have_ to go onto the convert queue where it's processed in
2345          * order.  The "now" variable is necessary to distinguish converts
2346          * being received and processed for the first time now, because once a
2347          * convert is moved to the conversion queue the condition below applies
2348          * requiring fifo granting.
2349          */
2350
2351         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2352                 return 1;
2353
2354         /*
2355          * Even if the convert is compat with all granted locks,
2356          * QUECVT forces it behind other locks on the convert queue.
2357          */
2358
2359         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2360                 if (list_empty(&r->res_convertqueue))
2361                         return 1;
2362                 else
2363                         goto out;
2364         }
2365
2366         /*
2367          * The NOORDER flag is set to avoid the standard vms rules on grant
2368          * order.
2369          */
2370
2371         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2372                 return 1;
2373
2374         /*
2375          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2376          * granted until all other conversion requests ahead of it are granted
2377          * and/or canceled.
2378          */
2379
2380         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2381                 return 1;
2382
2383         /*
2384          * 6-4: By default, a new request is immediately granted only if all
2385          * three of the following conditions are satisfied when the request is
2386          * issued:
2387          * - The queue of ungranted conversion requests for the resource is
2388          *   empty.
2389          * - The queue of ungranted new requests for the resource is empty.
2390          * - The mode of the new request is compatible with the most
2391          *   restrictive mode of all granted locks on the resource.
2392          */
2393
2394         if (now && !conv && list_empty(&r->res_convertqueue) &&
2395             list_empty(&r->res_waitqueue))
2396                 return 1;
2397
2398         /*
2399          * 6-4: Once a lock request is in the queue of ungranted new requests,
2400          * it cannot be granted until the queue of ungranted conversion
2401          * requests is empty, all ungranted new requests ahead of it are
2402          * granted and/or canceled, and it is compatible with the granted mode
2403          * of the most restrictive lock granted on the resource.
2404          */
2405
2406         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2407             first_in_list(lkb, &r->res_waitqueue))
2408                 return 1;
2409  out:
2410         return 0;
2411 }
2412
2413 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2414                           int *err)
2415 {
2416         int rv;
2417         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2418         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2419
2420         if (err)
2421                 *err = 0;
2422
2423         rv = _can_be_granted(r, lkb, now);
2424         if (rv)
2425                 goto out;
2426
2427         /*
2428          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2429          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2430          * cancels one of the locks.
2431          */
2432
2433         if (is_convert && can_be_queued(lkb) &&
2434             conversion_deadlock_detect(r, lkb)) {
2435                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2436                         lkb->lkb_grmode = DLM_LOCK_NL;
2437                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2438                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2439                         if (err)
2440                                 *err = -EDEADLK;
2441                         else {
2442                                 log_print("can_be_granted deadlock %x now %d",
2443                                           lkb->lkb_id, now);
2444                                 dlm_dump_rsb(r);
2445                         }
2446                 }
2447                 goto out;
2448         }
2449
2450         /*
2451          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2452          * to grant a request in a mode other than the normal rqmode.  It's a
2453          * simple way to provide a big optimization to applications that can
2454          * use them.
2455          */
2456
2457         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2458                 alt = DLM_LOCK_PR;
2459         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2460                 alt = DLM_LOCK_CW;
2461
2462         if (alt) {
2463                 lkb->lkb_rqmode = alt;
2464                 rv = _can_be_granted(r, lkb, now);
2465                 if (rv)
2466                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2467                 else
2468                         lkb->lkb_rqmode = rqmode;
2469         }
2470  out:
2471         return rv;
2472 }
2473
2474 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2475    for locks pending on the convert list.  Once verified (watch for these
2476    log_prints), we should be able to just call _can_be_granted() and not
2477    bother with the demote/deadlk cases here (and there's no easy way to deal
2478    with a deadlk here, we'd have to generate something like grant_lock with
2479    the deadlk error.) */
2480
2481 /* Returns the highest requested mode of all blocked conversions; sets
2482    cw if there's a blocked conversion to DLM_LOCK_CW. */
2483
2484 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2485                                  unsigned int *count)
2486 {
2487         struct dlm_lkb *lkb, *s;
2488         int hi, demoted, quit, grant_restart, demote_restart;
2489         int deadlk;
2490
2491         quit = 0;
2492  restart:
2493         grant_restart = 0;
2494         demote_restart = 0;
2495         hi = DLM_LOCK_IV;
2496
2497         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2498                 demoted = is_demoted(lkb);
2499                 deadlk = 0;
2500
2501                 if (can_be_granted(r, lkb, 0, &deadlk)) {
2502                         grant_lock_pending(r, lkb);
2503                         grant_restart = 1;
2504                         if (count)
2505                                 (*count)++;
2506                         continue;
2507                 }
2508
2509                 if (!demoted && is_demoted(lkb)) {
2510                         log_print("WARN: pending demoted %x node %d %s",
2511                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2512                         demote_restart = 1;
2513                         continue;
2514                 }
2515
2516                 if (deadlk) {
2517                         log_print("WARN: pending deadlock %x node %d %s",
2518                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2519                         dlm_dump_rsb(r);
2520                         continue;
2521                 }
2522
2523                 hi = max_t(int, lkb->lkb_rqmode, hi);
2524
2525                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2526                         *cw = 1;
2527         }
2528
2529         if (grant_restart)
2530                 goto restart;
2531         if (demote_restart && !quit) {
2532                 quit = 1;
2533                 goto restart;
2534         }
2535
2536         return max_t(int, high, hi);
2537 }
2538
2539 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2540                               unsigned int *count)
2541 {
2542         struct dlm_lkb *lkb, *s;
2543
2544         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2545                 if (can_be_granted(r, lkb, 0, NULL)) {
2546                         grant_lock_pending(r, lkb);
2547                         if (count)
2548                                 (*count)++;
2549                 } else {
2550                         high = max_t(int, lkb->lkb_rqmode, high);
2551                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2552                                 *cw = 1;
2553                 }
2554         }
2555
2556         return high;
2557 }
2558
2559 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2560    on either the convert or waiting queue.
2561    high is the largest rqmode of all locks blocked on the convert or
2562    waiting queue. */
2563
2564 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2565 {
2566         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2567                 if (gr->lkb_highbast < DLM_LOCK_EX)
2568                         return 1;
2569                 return 0;
2570         }
2571
2572         if (gr->lkb_highbast < high &&
2573             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2574                 return 1;
2575         return 0;
2576 }
2577
2578 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2579 {
2580         struct dlm_lkb *lkb, *s;
2581         int high = DLM_LOCK_IV;
2582         int cw = 0;
2583
2584         if (!is_master(r)) {
2585                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2586                 dlm_dump_rsb(r);
2587                 return;
2588         }
2589
2590         high = grant_pending_convert(r, high, &cw, count);
2591         high = grant_pending_wait(r, high, &cw, count);
2592
2593         if (high == DLM_LOCK_IV)
2594                 return;
2595
2596         /*
2597          * If there are locks left on the wait/convert queue then send blocking
2598          * ASTs to granted locks based on the largest requested mode (high)
2599          * found above.
2600          */
2601
2602         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2603                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2604                         if (cw && high == DLM_LOCK_PR &&
2605                             lkb->lkb_grmode == DLM_LOCK_PR)
2606                                 queue_bast(r, lkb, DLM_LOCK_CW);
2607                         else
2608                                 queue_bast(r, lkb, high);
2609                         lkb->lkb_highbast = high;
2610                 }
2611         }
2612 }
2613
2614 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2615 {
2616         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2617             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2618                 if (gr->lkb_highbast < DLM_LOCK_EX)
2619                         return 1;
2620                 return 0;
2621         }
2622
2623         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2624                 return 1;
2625         return 0;
2626 }
2627
2628 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2629                             struct dlm_lkb *lkb)
2630 {
2631         struct dlm_lkb *gr;
2632
2633         list_for_each_entry(gr, head, lkb_statequeue) {
2634                 /* skip self when sending basts to convertqueue */
2635                 if (gr == lkb)
2636                         continue;
2637                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2638                         queue_bast(r, gr, lkb->lkb_rqmode);
2639                         gr->lkb_highbast = lkb->lkb_rqmode;
2640                 }
2641         }
2642 }
2643
2644 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2645 {
2646         send_bast_queue(r, &r->res_grantqueue, lkb);
2647 }
2648
2649 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2650 {
2651         send_bast_queue(r, &r->res_grantqueue, lkb);
2652         send_bast_queue(r, &r->res_convertqueue, lkb);
2653 }
2654
2655 /* set_master(r, lkb) -- set the master nodeid of a resource
2656
2657    The purpose of this function is to set the nodeid field in the given
2658    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2659    known, it can just be copied to the lkb and the function will return
2660    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2661    before it can be copied to the lkb.
2662
2663    When the rsb nodeid is being looked up remotely, the initial lkb
2664    causing the lookup is kept on the ls_waiters list waiting for the
2665    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2666    on the rsb's res_lookup list until the master is verified.
2667
2668    Return values:
2669    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2670    1: the rsb master is not available and the lkb has been placed on
2671       a wait queue
2672 */
2673
2674 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2675 {
2676         int our_nodeid = dlm_our_nodeid();
2677
2678         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2679                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2680                 r->res_first_lkid = lkb->lkb_id;
2681                 lkb->lkb_nodeid = r->res_nodeid;
2682                 return 0;
2683         }
2684
2685         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2686                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2687                 return 1;
2688         }
2689
2690         if (r->res_master_nodeid == our_nodeid) {
2691                 lkb->lkb_nodeid = 0;
2692                 return 0;
2693         }
2694
2695         if (r->res_master_nodeid) {
2696                 lkb->lkb_nodeid = r->res_master_nodeid;
2697                 return 0;
2698         }
2699
2700         if (dlm_dir_nodeid(r) == our_nodeid) {
2701                 /* This is a somewhat unusual case; find_rsb will usually
2702                    have set res_master_nodeid when dir nodeid is local, but
2703                    there are cases where we become the dir node after we've
2704                    past find_rsb and go through _request_lock again.
2705                    confirm_master() or process_lookup_list() needs to be
2706                    called after this. */
2707                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2708                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2709                           r->res_name);
2710                 r->res_master_nodeid = our_nodeid;
2711                 r->res_nodeid = 0;
2712                 lkb->lkb_nodeid = 0;
2713                 return 0;
2714         }
2715
2716         wait_pending_remove(r);
2717
2718         r->res_first_lkid = lkb->lkb_id;
2719         send_lookup(r, lkb);
2720         return 1;
2721 }
2722
2723 static void process_lookup_list(struct dlm_rsb *r)
2724 {
2725         struct dlm_lkb *lkb, *safe;
2726
2727         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2728                 list_del_init(&lkb->lkb_rsb_lookup);
2729                 _request_lock(r, lkb);
2730                 schedule();
2731         }
2732 }
2733
2734 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2735
2736 static void confirm_master(struct dlm_rsb *r, int error)
2737 {
2738         struct dlm_lkb *lkb;
2739
2740         if (!r->res_first_lkid)
2741                 return;
2742
2743         switch (error) {
2744         case 0:
2745         case -EINPROGRESS:
2746                 r->res_first_lkid = 0;
2747                 process_lookup_list(r);
2748                 break;
2749
2750         case -EAGAIN:
2751         case -EBADR:
2752         case -ENOTBLK:
2753                 /* the remote request failed and won't be retried (it was
2754                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2755                    lkb the first_lkid */
2756
2757                 r->res_first_lkid = 0;
2758
2759                 if (!list_empty(&r->res_lookup)) {
2760                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2761                                          lkb_rsb_lookup);
2762                         list_del_init(&lkb->lkb_rsb_lookup);
2763                         r->res_first_lkid = lkb->lkb_id;
2764                         _request_lock(r, lkb);
2765                 }
2766                 break;
2767
2768         default:
2769                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2770         }
2771 }
2772
2773 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2774                          int namelen, unsigned long timeout_cs,
2775                          void (*ast) (void *astparam),
2776                          void *astparam,
2777                          void (*bast) (void *astparam, int mode),
2778                          struct dlm_args *args)
2779 {
2780         int rv = -EINVAL;
2781
2782         /* check for invalid arg usage */
2783
2784         if (mode < 0 || mode > DLM_LOCK_EX)
2785                 goto out;
2786
2787         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2788                 goto out;
2789
2790         if (flags & DLM_LKF_CANCEL)
2791                 goto out;
2792
2793         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2794                 goto out;
2795
2796         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2797                 goto out;
2798
2799         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2800                 goto out;
2801
2802         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2803                 goto out;
2804
2805         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2806                 goto out;
2807
2808         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2809                 goto out;
2810
2811         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2812                 goto out;
2813
2814         if (!ast || !lksb)
2815                 goto out;
2816
2817         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2818                 goto out;
2819
2820         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2821                 goto out;
2822
2823         /* these args will be copied to the lkb in validate_lock_args,
2824            it cannot be done now because when converting locks, fields in
2825            an active lkb cannot be modified before locking the rsb */
2826
2827         args->flags = flags;
2828         args->astfn = ast;
2829         args->astparam = astparam;
2830         args->bastfn = bast;
2831         args->timeout = timeout_cs;
2832         args->mode = mode;
2833         args->lksb = lksb;
2834         rv = 0;
2835  out:
2836         return rv;
2837 }
2838
2839 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2840 {
2841         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2842                       DLM_LKF_FORCEUNLOCK))
2843                 return -EINVAL;
2844
2845         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2846                 return -EINVAL;
2847
2848         args->flags = flags;
2849         args->astparam = astarg;
2850         return 0;
2851 }
2852
2853 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2854                               struct dlm_args *args)
2855 {
2856         int rv = -EINVAL;
2857
2858         if (args->flags & DLM_LKF_CONVERT) {
2859                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2860                         goto out;
2861
2862                 if (args->flags & DLM_LKF_QUECVT &&
2863                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2864                         goto out;
2865
2866                 rv = -EBUSY;
2867                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2868                         goto out;
2869
2870                 if (lkb->lkb_wait_type)
2871                         goto out;
2872
2873                 if (is_overlap(lkb))
2874                         goto out;
2875         }
2876
2877         lkb->lkb_exflags = args->flags;
2878         lkb->lkb_sbflags = 0;
2879         lkb->lkb_astfn = args->astfn;
2880         lkb->lkb_astparam = args->astparam;
2881         lkb->lkb_bastfn = args->bastfn;
2882         lkb->lkb_rqmode = args->mode;
2883         lkb->lkb_lksb = args->lksb;
2884         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2885         lkb->lkb_ownpid = (int) current->pid;
2886         lkb->lkb_timeout_cs = args->timeout;
2887         rv = 0;
2888  out:
2889         if (rv)
2890                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2891                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2892                           lkb->lkb_status, lkb->lkb_wait_type,
2893                           lkb->lkb_resource->res_name);
2894         return rv;
2895 }
2896
2897 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2898    for success */
2899
2900 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2901    because there may be a lookup in progress and it's valid to do
2902    cancel/unlockf on it */
2903
2904 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2905 {
2906         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2907         int rv = -EINVAL;
2908
2909         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2910                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2911                 dlm_print_lkb(lkb);
2912                 goto out;
2913         }
2914
2915         /* an lkb may still exist even though the lock is EOL'ed due to a
2916            cancel, unlock or failed noqueue request; an app can't use these
2917            locks; return same error as if the lkid had not been found at all */
2918
2919         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2920                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2921                 rv = -ENOENT;
2922                 goto out;
2923         }
2924
2925         /* an lkb may be waiting for an rsb lookup to complete where the
2926            lookup was initiated by another lock */
2927
2928         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2929                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2930                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2931                         list_del_init(&lkb->lkb_rsb_lookup);
2932                         queue_cast(lkb->lkb_resource, lkb,
2933                                    args->flags & DLM_LKF_CANCEL ?
2934                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2935                         unhold_lkb(lkb); /* undoes create_lkb() */
2936                 }
2937                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2938                 rv = -EBUSY;
2939                 goto out;
2940         }
2941
2942         /* cancel not allowed with another cancel/unlock in progress */
2943
2944         if (args->flags & DLM_LKF_CANCEL) {
2945                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2946                         goto out;
2947
2948                 if (is_overlap(lkb))
2949                         goto out;
2950
2951                 /* don't let scand try to do a cancel */
2952                 del_timeout(lkb);
2953
2954                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2955                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2956                         rv = -EBUSY;
2957                         goto out;
2958                 }
2959
2960                 /* there's nothing to cancel */
2961                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2962                     !lkb->lkb_wait_type) {
2963                         rv = -EBUSY;
2964                         goto out;
2965                 }
2966
2967                 switch (lkb->lkb_wait_type) {
2968                 case DLM_MSG_LOOKUP:
2969                 case DLM_MSG_REQUEST:
2970                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2971                         rv = -EBUSY;
2972                         goto out;
2973                 case DLM_MSG_UNLOCK:
2974                 case DLM_MSG_CANCEL:
2975                         goto out;
2976                 }
2977                 /* add_to_waiters() will set OVERLAP_CANCEL */
2978                 goto out_ok;
2979         }
2980
2981         /* do we need to allow a force-unlock if there's a normal unlock
2982            already in progress?  in what conditions could the normal unlock
2983            fail such that we'd want to send a force-unlock to be sure? */
2984
2985         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2986                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2987                         goto out;
2988
2989                 if (is_overlap_unlock(lkb))
2990                         goto out;
2991
2992                 /* don't let scand try to do a cancel */
2993                 del_timeout(lkb);
2994
2995                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2996                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2997                         rv = -EBUSY;
2998                         goto out;
2999                 }
3000
3001                 switch (lkb->lkb_wait_type) {
3002                 case DLM_MSG_LOOKUP:
3003                 case DLM_MSG_REQUEST:
3004                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3005                         rv = -EBUSY;
3006                         goto out;
3007                 case DLM_MSG_UNLOCK:
3008                         goto out;
3009                 }
3010                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3011                 goto out_ok;
3012         }
3013
3014         /* normal unlock not allowed if there's any op in progress */
3015         rv = -EBUSY;
3016         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3017                 goto out;
3018
3019  out_ok:
3020         /* an overlapping op shouldn't blow away exflags from other op */
3021         lkb->lkb_exflags |= args->flags;
3022         lkb->lkb_sbflags = 0;
3023         lkb->lkb_astparam = args->astparam;
3024         rv = 0;
3025  out:
3026         if (rv)
3027                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3028                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3029                           args->flags, lkb->lkb_wait_type,
3030                           lkb->lkb_resource->res_name);
3031         return rv;
3032 }
3033
3034 /*
3035  * Four stage 4 varieties:
3036  * do_request(), do_convert(), do_unlock(), do_cancel()
3037  * These are called on the master node for the given lock and
3038  * from the central locking logic.
3039  */
3040
3041 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3042 {
3043         int error = 0;
3044
3045         if (can_be_granted(r, lkb, 1, NULL)) {
3046                 grant_lock(r, lkb);
3047                 queue_cast(r, lkb, 0);
3048                 goto out;
3049         }
3050
3051         if (can_be_queued(lkb)) {
3052                 error = -EINPROGRESS;
3053                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3054                 add_timeout(lkb);
3055                 goto out;
3056         }
3057
3058         error = -EAGAIN;
3059         queue_cast(r, lkb, -EAGAIN);
3060  out:
3061         return error;
3062 }
3063
3064 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3065                                int error)
3066 {
3067         switch (error) {
3068         case -EAGAIN:
3069                 if (force_blocking_asts(lkb))
3070                         send_blocking_asts_all(r, lkb);
3071                 break;
3072         case -EINPROGRESS:
3073                 send_blocking_asts(r, lkb);
3074                 break;
3075         }
3076 }
3077
3078 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3079 {
3080         int error = 0;
3081         int deadlk = 0;
3082
3083         /* changing an existing lock may allow others to be granted */
3084
3085         if (can_be_granted(r, lkb, 1, &deadlk)) {
3086                 grant_lock(r, lkb);
3087                 queue_cast(r, lkb, 0);
3088                 goto out;
3089         }
3090
3091         /* can_be_granted() detected that this lock would block in a conversion
3092            deadlock, so we leave it on the granted queue and return EDEADLK in
3093            the ast for the convert. */
3094
3095         if (deadlk) {
3096                 /* it's left on the granted queue */
3097                 revert_lock(r, lkb);
3098                 queue_cast(r, lkb, -EDEADLK);
3099                 error = -EDEADLK;
3100                 goto out;
3101         }
3102
3103         /* is_demoted() means the can_be_granted() above set the grmode
3104            to NL, and left us on the granted queue.  This auto-demotion
3105            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3106            now grantable.  We have to try to grant other converting locks
3107            before we try again to grant this one. */
3108
3109         if (is_demoted(lkb)) {
3110                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3111                 if (_can_be_granted(r, lkb, 1)) {
3112                         grant_lock(r, lkb);
3113                         queue_cast(r, lkb, 0);
3114                         goto out;
3115                 }
3116                 /* else fall through and move to convert queue */
3117         }
3118
3119         if (can_be_queued(lkb)) {
3120                 error = -EINPROGRESS;
3121                 del_lkb(r, lkb);
3122                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3123                 add_timeout(lkb);
3124                 goto out;
3125         }
3126
3127         error = -EAGAIN;
3128         queue_cast(r, lkb, -EAGAIN);
3129  out:
3130         return error;
3131 }
3132
3133 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3134                                int error)
3135 {
3136         switch (error) {
3137         case 0:
3138                 grant_pending_locks(r, NULL);
3139                 /* grant_pending_locks also sends basts */
3140                 break;
3141         case -EAGAIN:
3142                 if (force_blocking_asts(lkb))
3143                         send_blocking_asts_all(r, lkb);
3144                 break;
3145         case -EINPROGRESS:
3146                 send_blocking_asts(r, lkb);
3147                 break;
3148         }
3149 }
3150
3151 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3152 {
3153         remove_lock(r, lkb);
3154         queue_cast(r, lkb, -DLM_EUNLOCK);
3155         return -DLM_EUNLOCK;
3156 }
3157
3158 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3159                               int error)
3160 {
3161         grant_pending_locks(r, NULL);
3162 }
3163
3164 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3165
3166 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3167 {
3168         int error;
3169
3170         error = revert_lock(r, lkb);
3171         if (error) {
3172                 queue_cast(r, lkb, -DLM_ECANCEL);
3173                 return -DLM_ECANCEL;
3174         }
3175         return 0;
3176 }
3177
3178 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3179                               int error)
3180 {
3181         if (error)
3182                 grant_pending_locks(r, NULL);
3183 }
3184
3185 /*
3186  * Four stage 3 varieties:
3187  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3188  */
3189
3190 /* add a new lkb to a possibly new rsb, called by requesting process */
3191
3192 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3193 {
3194         int error;
3195
3196         /* set_master: sets lkb nodeid from r */
3197
3198         error = set_master(r, lkb);
3199         if (error < 0)
3200                 goto out;
3201         if (error) {
3202                 error = 0;
3203                 goto out;
3204         }
3205
3206         if (is_remote(r)) {
3207                 /* receive_request() calls do_request() on remote node */
3208                 error = send_request(r, lkb);
3209         } else {
3210                 error = do_request(r, lkb);
3211                 /* for remote locks the request_reply is sent
3212                    between do_request and do_request_effects */
3213                 do_request_effects(r, lkb, error);
3214         }
3215  out:
3216         return error;
3217 }
3218
3219 /* change some property of an existing lkb, e.g. mode */
3220
3221 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3222 {
3223         int error;
3224
3225         if (is_remote(r)) {
3226                 /* receive_convert() calls do_convert() on remote node */
3227                 error = send_convert(r, lkb);
3228         } else {
3229                 error = do_convert(r, lkb);
3230                 /* for remote locks the convert_reply is sent
3231                    between do_convert and do_convert_effects */
3232                 do_convert_effects(r, lkb, error);
3233         }
3234
3235         return error;
3236 }
3237
3238 /* remove an existing lkb from the granted queue */
3239
3240 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3241 {
3242         int error;
3243
3244         if (is_remote(r)) {
3245                 /* receive_unlock() calls do_unlock() on remote node */
3246                 error = send_unlock(r, lkb);
3247         } else {
3248                 error = do_unlock(r, lkb);
3249                 /* for remote locks the unlock_reply is sent
3250                    between do_unlock and do_unlock_effects */
3251                 do_unlock_effects(r, lkb, error);
3252         }
3253
3254         return error;
3255 }
3256
3257 /* remove an existing lkb from the convert or wait queue */
3258
3259 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3260 {
3261         int error;
3262
3263         if (is_remote(r)) {
3264                 /* receive_cancel() calls do_cancel() on remote node */
3265                 error = send_cancel(r, lkb);
3266         } else {
3267                 error = do_cancel(r, lkb);
3268                 /* for remote locks the cancel_reply is sent
3269                    between do_cancel and do_cancel_effects */
3270                 do_cancel_effects(r, lkb, error);
3271         }
3272
3273         return error;
3274 }
3275
3276 /*
3277  * Four stage 2 varieties:
3278  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3279  */
3280
3281 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3282                         int len, struct dlm_args *args)
3283 {
3284         struct dlm_rsb *r;
3285         int error;
3286
3287         error = validate_lock_args(ls, lkb, args);
3288         if (error)
3289                 return error;
3290
3291         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3292         if (error)
3293                 return error;
3294
3295         lock_rsb(r);
3296
3297         attach_lkb(r, lkb);
3298         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3299
3300         error = _request_lock(r, lkb);
3301
3302         unlock_rsb(r);
3303         put_rsb(r);
3304         return error;
3305 }
3306
3307 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3308                         struct dlm_args *args)
3309 {
3310         struct dlm_rsb *r;
3311         int error;
3312
3313         r = lkb->lkb_resource;
3314
3315         hold_rsb(r);
3316         lock_rsb(r);
3317
3318         error = validate_lock_args(ls, lkb, args);
3319         if (error)
3320                 goto out;
3321
3322         error = _convert_lock(r, lkb);
3323  out:
3324         unlock_rsb(r);
3325         put_rsb(r);
3326         return error;
3327 }
3328
3329 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3330                        struct dlm_args *args)
3331 {
3332         struct dlm_rsb *r;
3333         int error;
3334
3335         r = lkb->lkb_resource;
3336
3337         hold_rsb(r);
3338         lock_rsb(r);
3339
3340         error = validate_unlock_args(lkb, args);
3341         if (error)
3342                 goto out;
3343
3344         error = _unlock_lock(r, lkb);
3345  out:
3346         unlock_rsb(r);
3347         put_rsb(r);
3348         return error;
3349 }
3350
3351 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3352                        struct dlm_args *args)
3353 {
3354         struct dlm_rsb *r;
3355         int error;
3356
3357         r = lkb->lkb_resource;
3358
3359         hold_rsb(r);
3360         lock_rsb(r);
3361
3362         error = validate_unlock_args(lkb, args);
3363         if (error)
3364                 goto out;
3365
3366         error = _cancel_lock(r, lkb);
3367  out:
3368         unlock_rsb(r);
3369         put_rsb(r);
3370         return error;
3371 }
3372
3373 /*
3374  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3375  */
3376
3377 int dlm_lock(dlm_lockspace_t *lockspace,
3378              int mode,
3379              struct dlm_lksb *lksb,
3380              uint32_t flags,
3381              void *name,
3382              unsigned int namelen,
3383              uint32_t parent_lkid,
3384              void (*ast) (void *astarg),
3385              void *astarg,
3386              void (*bast) (void *astarg, int mode))
3387 {
3388         struct dlm_ls *ls;
3389         struct dlm_lkb *lkb;
3390         struct dlm_args args;
3391         int error, convert = flags & DLM_LKF_CONVERT;
3392
3393         ls = dlm_find_lockspace_local(lockspace);
3394         if (!ls)
3395                 return -EINVAL;
3396
3397         dlm_lock_recovery(ls);
3398
3399         if (convert)
3400                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3401         else
3402                 error = create_lkb(ls, &lkb);
3403
3404         if (error)
3405                 goto out;
3406
3407         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3408                               astarg, bast, &args);
3409         if (error)
3410                 goto out_put;
3411
3412         if (convert)
3413                 error = convert_lock(ls, lkb, &args);
3414         else
3415                 error = request_lock(ls, lkb, name, namelen, &args);
3416
3417         if (error == -EINPROGRESS)
3418                 error = 0;
3419  out_put:
3420         if (convert || error)
3421                 __put_lkb(ls, lkb);
3422         if (error == -EAGAIN || error == -EDEADLK)
3423                 error = 0;
3424  out:
3425         dlm_unlock_recovery(ls);
3426         dlm_put_lockspace(ls);
3427         return error;
3428 }
3429
3430 int dlm_unlock(dlm_lockspace_t *lockspace,
3431                uint32_t lkid,
3432                uint32_t flags,
3433                struct dlm_lksb *lksb,
3434                void *astarg)
3435 {
3436         struct dlm_ls *ls;
3437         struct dlm_lkb *lkb;
3438         struct dlm_args args;
3439         int error;
3440
3441         ls = dlm_find_lockspace_local(lockspace);
3442         if (!ls)
3443                 return -EINVAL;
3444
3445         dlm_lock_recovery(ls);
3446
3447         error = find_lkb(ls, lkid, &lkb);
3448         if (error)
3449                 goto out;
3450
3451         error = set_unlock_args(flags, astarg, &args);
3452         if (error)
3453                 goto out_put;
3454
3455         if (flags & DLM_LKF_CANCEL)
3456                 error = cancel_lock(ls, lkb, &args);
3457         else
3458                 error = unlock_lock(ls, lkb, &args);
3459
3460         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3461                 error = 0;
3462         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3463                 error = 0;
3464  out_put:
3465         dlm_put_lkb(lkb);
3466  out:
3467         dlm_unlock_recovery(ls);
3468         dlm_put_lockspace(ls);
3469         return error;
3470 }
3471
3472 /*
3473  * send/receive routines for remote operations and replies
3474  *
3475  * send_args
3476  * send_common
3477  * send_request                 receive_request
3478  * send_convert                 receive_convert
3479  * send_unlock                  receive_unlock
3480  * send_cancel                  receive_cancel
3481  * send_grant                   receive_grant
3482  * send_bast                    receive_bast
3483  * send_lookup                  receive_lookup
3484  * send_remove                  receive_remove
3485  *
3486  *                              send_common_reply
3487  * receive_request_reply        send_request_reply
3488  * receive_convert_reply        send_convert_reply
3489  * receive_unlock_reply         send_unlock_reply
3490  * receive_cancel_reply         send_cancel_reply
3491  * receive_lookup_reply         send_lookup_reply
3492  */
3493
3494 static int _create_message(struct dlm_ls *ls, int mb_len,
3495                            int to_nodeid, int mstype,
3496                            struct dlm_message **ms_ret,
3497                            struct dlm_mhandle **mh_ret)
3498 {
3499         struct dlm_message *ms;
3500         struct dlm_mhandle *mh;
3501         char *mb;
3502
3503         /* get_buffer gives us a message handle (mh) that we need to
3504            pass into lowcomms_commit and a message buffer (mb) that we
3505            write our data into */
3506
3507         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3508         if (!mh)
3509                 return -ENOBUFS;
3510
3511         memset(mb, 0, mb_len);
3512
3513         ms = (struct dlm_message *) mb;
3514
3515         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3516         ms->m_header.h_lockspace = ls->ls_global_id;
3517         ms->m_header.h_nodeid = dlm_our_nodeid();
3518         ms->m_header.h_length = mb_len;
3519         ms->m_header.h_cmd = DLM_MSG;
3520
3521         ms->m_type = mstype;
3522
3523         *mh_ret = mh;
3524         *ms_ret = ms;
3525         return 0;
3526 }
3527
3528 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3529                           int to_nodeid, int mstype,
3530                           struct dlm_message **ms_ret,
3531                           struct dlm_mhandle **mh_ret)
3532 {
3533         int mb_len = sizeof(struct dlm_message);
3534
3535         switch (mstype) {
3536         case DLM_MSG_REQUEST:
3537         case DLM_MSG_LOOKUP:
3538         case DLM_MSG_REMOVE:
3539                 mb_len += r->res_length;
3540                 break;
3541         case DLM_MSG_CONVERT:
3542         case DLM_MSG_UNLOCK:
3543         case DLM_MSG_REQUEST_REPLY:
3544         case DLM_MSG_CONVERT_REPLY:
3545         case DLM_MSG_GRANT:
3546                 if (lkb && lkb->lkb_lvbptr)
3547                         mb_len += r->res_ls->ls_lvblen;
3548                 break;
3549         }
3550
3551         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3552                                ms_ret, mh_ret);
3553 }
3554
3555 /* further lowcomms enhancements or alternate implementations may make
3556    the return value from this function useful at some point */
3557
3558 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3559 {
3560         dlm_message_out(ms);
3561         dlm_lowcomms_commit_buffer(mh);
3562         return 0;
3563 }
3564
3565 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3566                       struct dlm_message *ms)
3567 {
3568         ms->m_nodeid   = lkb->lkb_nodeid;
3569         ms->m_pid      = lkb->lkb_ownpid;
3570         ms->m_lkid     = lkb->lkb_id;
3571         ms->m_remid    = lkb->lkb_remid;
3572         ms->m_exflags  = lkb->lkb_exflags;
3573         ms->m_sbflags  = lkb->lkb_sbflags;
3574         ms->m_flags    = lkb->lkb_flags;
3575         ms->m_lvbseq   = lkb->lkb_lvbseq;
3576         ms->m_status   = lkb->lkb_status;
3577         ms->m_grmode   = lkb->lkb_grmode;
3578         ms->m_rqmode   = lkb->lkb_rqmode;
3579         ms->m_hash     = r->res_hash;
3580
3581         /* m_result and m_bastmode are set from function args,
3582            not from lkb fields */
3583
3584         if (lkb->lkb_bastfn)
3585                 ms->m_asts |= DLM_CB_BAST;
3586         if (lkb->lkb_astfn)
3587                 ms->m_asts |= DLM_CB_CAST;
3588
3589         /* compare with switch in create_message; send_remove() doesn't
3590            use send_args() */
3591
3592         switch (ms->m_type) {
3593         case DLM_MSG_REQUEST:
3594         case DLM_MSG_LOOKUP:
3595                 memcpy(ms->m_extra, r->res_name, r->res_length);
3596                 break;
3597         case DLM_MSG_CONVERT:
3598         case DLM_MSG_UNLOCK:
3599         case DLM_MSG_REQUEST_REPLY:
3600         case DLM_MSG_CONVERT_REPLY:
3601         case DLM_MSG_GRANT:
3602                 if (!lkb->lkb_lvbptr)
3603                         break;
3604                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3605                 break;
3606         }
3607 }
3608
3609 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3610 {
3611         struct dlm_message *ms;
3612         struct dlm_mhandle *mh;
3613         int to_nodeid, error;
3614
3615         to_nodeid = r->res_nodeid;
3616
3617         error = add_to_waiters(lkb, mstype, to_nodeid);
3618         if (error)
3619                 return error;
3620
3621         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3622         if (error)
3623                 goto fail;
3624
3625         send_args(r, lkb, ms);
3626
3627         error = send_message(mh, ms);
3628         if (error)
3629                 goto fail;
3630         return 0;
3631
3632  fail:
3633         remove_from_waiters(lkb, msg_reply_type(mstype));
3634         return error;
3635 }
3636
3637 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3638 {
3639         return send_common(r, lkb, DLM_MSG_REQUEST);
3640 }
3641
3642 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3643 {
3644         int error;
3645
3646         error = send_common(r, lkb, DLM_MSG_CONVERT);
3647
3648         /* down conversions go without a reply from the master */
3649         if (!error && down_conversion(lkb)) {
3650                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3651                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3652                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3653                 r->res_ls->ls_stub_ms.m_result = 0;
3654                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3655         }
3656
3657         return error;
3658 }
3659
3660 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3661    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3662    that the master is still correct. */
3663
3664 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665 {
3666         return send_common(r, lkb, DLM_MSG_UNLOCK);
3667 }
3668
3669 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3670 {
3671         return send_common(r, lkb, DLM_MSG_CANCEL);
3672 }
3673
3674 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3675 {
3676         struct dlm_message *ms;
3677         struct dlm_mhandle *mh;
3678         int to_nodeid, error;
3679
3680         to_nodeid = lkb->lkb_nodeid;
3681
3682         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3683         if (error)
3684                 goto out;
3685
3686         send_args(r, lkb, ms);
3687
3688         ms->m_result = 0;
3689
3690         error = send_message(mh, ms);
3691  out:
3692         return error;
3693 }
3694
3695 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3696 {
3697         struct dlm_message *ms;
3698         struct dlm_mhandle *mh;
3699         int to_nodeid, error;
3700
3701         to_nodeid = lkb->lkb_nodeid;
3702
3703         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3704         if (error)
3705                 goto out;
3706
3707         send_args(r, lkb, ms);
3708
3709         ms->m_bastmode = mode;
3710
3711         error = send_message(mh, ms);
3712  out:
3713         return error;
3714 }
3715
3716 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3717 {
3718         struct dlm_message *ms;
3719         struct dlm_mhandle *mh;
3720         int to_nodeid, error;
3721
3722         to_nodeid = dlm_dir_nodeid(r);
3723
3724         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3725         if (error)
3726                 return error;
3727
3728         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3729         if (error)
3730                 goto fail;
3731
3732         send_args(r, lkb, ms);
3733
3734         error = send_message(mh, ms);
3735         if (error)
3736                 goto fail;
3737         return 0;
3738
3739  fail:
3740         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3741         return error;
3742 }
3743
3744 static int send_remove(struct dlm_rsb *r)
3745 {
3746         struct dlm_message *ms;
3747         struct dlm_mhandle *mh;
3748         int to_nodeid, error;
3749
3750         to_nodeid = dlm_dir_nodeid(r);
3751
3752         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3753         if (error)
3754                 goto out;
3755
3756         memcpy(ms->m_extra, r->res_name, r->res_length);
3757         ms->m_hash = r->res_hash;
3758
3759         error = send_message(mh, ms);
3760  out:
3761         return error;
3762 }
3763
3764 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3765                              int mstype, int rv)
3766 {
3767         struct dlm_message *ms;
3768         struct dlm_mhandle *mh;
3769         int to_nodeid, error;
3770
3771         to_nodeid = lkb->lkb_nodeid;
3772
3773         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3774         if (error)
3775                 goto out;
3776
3777         send_args(r, lkb, ms);
3778
3779         ms->m_result = rv;
3780
3781         error = send_message(mh, ms);
3782  out:
3783         return error;
3784 }
3785
3786 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3787 {
3788         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3789 }
3790
3791 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3792 {
3793         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3794 }
3795
3796 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3797 {
3798         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3799 }
3800
3801 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3802 {
3803         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3804 }
3805
3806 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3807                              int ret_nodeid, int rv)
3808 {
3809         struct dlm_rsb *r = &ls->ls_stub_rsb;
3810         struct dlm_message *ms;
3811         struct dlm_mhandle *mh;
3812         int error, nodeid = ms_in->m_header.h_nodeid;
3813
3814         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3815         if (error)
3816                 goto out;
3817
3818         ms->m_lkid = ms_in->m_lkid;
3819         ms->m_result = rv;
3820         ms->m_nodeid = ret_nodeid;
3821
3822         error = send_message(mh, ms);
3823  out:
3824         return error;
3825 }
3826
3827 /* which args we save from a received message depends heavily on the type
3828    of message, unlike the send side where we can safely send everything about
3829    the lkb for any type of message */
3830
3831 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3832 {
3833         lkb->lkb_exflags = ms->m_exflags;
3834         lkb->lkb_sbflags = ms->m_sbflags;
3835         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3836                          (ms->m_flags & 0x0000FFFF);
3837 }
3838
3839 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3840 {
3841         if (ms->m_flags == DLM_IFL_STUB_MS)
3842                 return;
3843
3844         lkb->lkb_sbflags = ms->m_sbflags;
3845         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3846                          (ms->m_flags & 0x0000FFFF);
3847 }
3848
3849 static int receive_extralen(struct dlm_message *ms)
3850 {
3851         return (ms->m_header.h_length - sizeof(struct dlm_message));
3852 }
3853
3854 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3855                        struct dlm_message *ms)
3856 {
3857         int len;
3858
3859         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3860                 if (!lkb->lkb_lvbptr)
3861                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3862                 if (!lkb->lkb_lvbptr)
3863                         return -ENOMEM;
3864                 len = receive_extralen(ms);
3865                 if (len > DLM_RESNAME_MAXLEN)
3866                         len = DLM_RESNAME_MAXLEN;
3867                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3868         }
3869         return 0;
3870 }
3871
3872 static void fake_bastfn(void *astparam, int mode)
3873 {
3874         log_print("fake_bastfn should not be called");
3875 }
3876
3877 static void fake_astfn(void *astparam)
3878 {
3879         log_print("fake_astfn should not be called");
3880 }
3881
3882 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3883                                 struct dlm_message *ms)
3884 {
3885         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3886         lkb->lkb_ownpid = ms->m_pid;
3887         lkb->lkb_remid = ms->m_lkid;
3888         lkb->lkb_grmode = DLM_LOCK_IV;
3889         lkb->lkb_rqmode = ms->m_rqmode;
3890
3891         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3892         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3893
3894         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3895                 /* lkb was just created so there won't be an lvb yet */
3896                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3897                 if (!lkb->lkb_lvbptr)
3898                         return -ENOMEM;
3899         }
3900
3901         return 0;
3902 }
3903
3904 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3905                                 struct dlm_message *ms)
3906 {
3907         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3908                 return -EBUSY;
3909
3910         if (receive_lvb(ls, lkb, ms))
3911                 return -ENOMEM;
3912
3913         lkb->lkb_rqmode = ms->m_rqmode;
3914         lkb->lkb_lvbseq = ms->m_lvbseq;
3915
3916         return 0;
3917 }
3918
3919 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3920                                struct dlm_message *ms)
3921 {
3922         if (receive_lvb(ls, lkb, ms))
3923                 return -ENOMEM;
3924         return 0;
3925 }
3926
3927 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3928    uses to send a reply and that the remote end uses to process the reply. */
3929
3930 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3931 {
3932         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3933         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3934         lkb->lkb_remid = ms->m_lkid;
3935 }
3936
3937 /* This is called after the rsb is locked so that we can safely inspect
3938    fields in the lkb. */
3939
3940 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3941 {
3942         int from = ms->m_header.h_nodeid;
3943         int error = 0;
3944
3945         switch (ms->m_type) {
3946         case DLM_MSG_CONVERT:
3947         case DLM_MSG_UNLOCK:
3948         case DLM_MSG_CANCEL:
3949                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3950                         error = -EINVAL;
3951                 break;
3952
3953         case DLM_MSG_CONVERT_REPLY:
3954         case DLM_MSG_UNLOCK_REPLY:
3955         case DLM_MSG_CANCEL_REPLY:
3956         case DLM_MSG_GRANT:
3957         case DLM_MSG_BAST:
3958                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3959                         error = -EINVAL;
3960                 break;
3961
3962         case DLM_MSG_REQUEST_REPLY:
3963                 if (!is_process_copy(lkb))
3964                         error = -EINVAL;
3965                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3966                         error = -EINVAL;
3967                 break;
3968
3969         default:
3970                 error = -EINVAL;
3971         }
3972
3973         if (error)
3974                 log_error(lkb->lkb_resource->res_ls,
3975                           "ignore invalid message %d from %d %x %x %x %d",
3976                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3977                           lkb->lkb_flags, lkb->lkb_nodeid);
3978         return error;
3979 }
3980
3981 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3982 {
3983         struct dlm_lkb *lkb;
3984         struct dlm_rsb *r;
3985         int from_nodeid;
3986         int error, namelen;
3987
3988         from_nodeid = ms->m_header.h_nodeid;
3989
3990         error = create_lkb(ls, &lkb);
3991         if (error)
3992                 goto fail;
3993
3994         receive_flags(lkb, ms);
3995         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3996         error = receive_request_args(ls, lkb, ms);
3997         if (error) {
3998                 __put_lkb(ls, lkb);
3999                 goto fail;
4000         }
4001
4002         /* The dir node is the authority on whether we are the master
4003            for this rsb or not, so if the master sends us a request, we should
4004            recreate the rsb if we've destroyed it.   This race happens when we
4005            send a remove message to the dir node at the same time that the dir
4006            node sends us a request for the rsb. */
4007
4008         namelen = receive_extralen(ms);
4009
4010         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4011                          R_RECEIVE_REQUEST, &r);
4012         if (error) {
4013                 __put_lkb(ls, lkb);
4014                 goto fail;
4015         }
4016
4017         lock_rsb(r);
4018
4019         if (r->res_master_nodeid != dlm_our_nodeid()) {
4020                 error = validate_master_nodeid(ls, r, from_nodeid);
4021                 if (error) {
4022                         unlock_rsb(r);
4023                         put_rsb(r);
4024                         __put_lkb(ls, lkb);
4025                         goto fail;
4026                 }
4027         }
4028
4029         attach_lkb(r, lkb);
4030         error = do_request(r, lkb);
4031         send_request_reply(r, lkb, error);
4032         do_request_effects(r, lkb, error);
4033
4034         unlock_rsb(r);
4035         put_rsb(r);
4036
4037         if (error == -EINPROGRESS)
4038                 error = 0;
4039         if (error)
4040                 dlm_put_lkb(lkb);
4041         return 0;
4042
4043  fail:
4044         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4045            and do this receive_request again from process_lookup_list once
4046            we get the lookup reply.  This would avoid a many repeated
4047            ENOTBLK request failures when the lookup reply designating us
4048            as master is delayed. */
4049
4050         /* We could repeatedly return -EBADR here if our send_remove() is
4051            delayed in being sent/arriving/being processed on the dir node.
4052            Another node would repeatedly lookup up the master, and the dir
4053            node would continue returning our nodeid until our send_remove
4054            took effect. */
4055
4056         if (error != -ENOTBLK) {
4057                 log_limit(ls, "receive_request %x from %d %d",
4058                           ms->m_lkid, from_nodeid, error);
4059         }
4060
4061         setup_stub_lkb(ls, ms);
4062         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4063         return error;
4064 }
4065
4066 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4067 {
4068         struct dlm_lkb *lkb;
4069         struct dlm_rsb *r;
4070         int error, reply = 1;
4071
4072         error = find_lkb(ls, ms->m_remid, &lkb);
4073         if (error)
4074                 goto fail;
4075
4076         if (lkb->lkb_remid != ms->m_lkid) {
4077                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4078                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4079                           (unsigned long long)lkb->lkb_recover_seq,
4080                           ms->m_header.h_nodeid, ms->m_lkid);
4081                 error = -ENOENT;
4082                 goto fail;
4083         }
4084
4085         r = lkb->lkb_resource;
4086
4087         hold_rsb(r);
4088         lock_rsb(r);
4089
4090         error = validate_message(lkb, ms);
4091         if (error)
4092                 goto out;
4093
4094         receive_flags(lkb, ms);
4095
4096         error = receive_convert_args(ls, lkb, ms);
4097         if (error) {
4098                 send_convert_reply(r, lkb, error);
4099                 goto out;
4100         }
4101
4102         reply = !down_conversion(lkb);
4103
4104         error = do_convert(r, lkb);
4105         if (reply)
4106                 send_convert_reply(r, lkb, error);
4107         do_convert_effects(r, lkb, error);
4108  out:
4109         unlock_rsb(r);
4110         put_rsb(r);
4111         dlm_put_lkb(lkb);
4112         return 0;
4113
4114  fail:
4115         setup_stub_lkb(ls, ms);
4116         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4117         return error;
4118 }
4119
4120 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4121 {
4122         struct dlm_lkb *lkb;
4123         struct dlm_rsb *r;
4124         int error;
4125
4126         error = find_lkb(ls, ms->m_remid, &lkb);
4127         if (error)
4128                 goto fail;
4129
4130         if (lkb->lkb_remid != ms->m_lkid) {
4131                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4132                           lkb->lkb_id, lkb->lkb_remid,
4133                           ms->m_header.h_nodeid, ms->m_lkid);
4134                 error = -ENOENT;
4135                 goto fail;
4136         }
4137
4138         r = lkb->lkb_resource;
4139
4140         hold_rsb(r);
4141         lock_rsb(r);
4142
4143         error = validate_message(lkb, ms);
4144         if (error)
4145                 goto out;
4146
4147         receive_flags(lkb, ms);
4148
4149         error = receive_unlock_args(ls, lkb, ms);
4150         if (error) {
4151                 send_unlock_reply(r, lkb, error);
4152                 goto out;
4153         }
4154
4155         error = do_unlock(r, lkb);
4156         send_unlock_reply(r, lkb, error);
4157         do_unlock_effects(r, lkb, error);
4158  out:
4159         unlock_rsb(r);
4160         put_rsb(r);
4161         dlm_put_lkb(lkb);
4162         return 0;
4163
4164  fail:
4165         setup_stub_lkb(ls, ms);
4166         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4167         return error;
4168 }
4169
4170 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4171 {
4172         struct dlm_lkb *lkb;
4173         struct dlm_rsb *r;
4174         int error;
4175
4176         error = find_lkb(ls, ms->m_remid, &lkb);
4177         if (error)
4178                 goto fail;
4179
4180         receive_flags(lkb, ms);
4181
4182         r = lkb->lkb_resource;
4183
4184         hold_rsb(r);
4185         lock_rsb(r);
4186
4187         error = validate_message(lkb, ms);
4188         if (error)
4189                 goto out;
4190
4191         error = do_cancel(r, lkb);
4192         send_cancel_reply(r, lkb, error);
4193         do_cancel_effects(r, lkb, error);
4194  out:
4195         unlock_rsb(r);
4196         put_rsb(r);
4197         dlm_put_lkb(lkb);
4198         return 0;
4199
4200  fail:
4201         setup_stub_lkb(ls, ms);
4202         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4203         return error;
4204 }
4205
4206 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4207 {
4208         struct dlm_lkb *lkb;
4209         struct dlm_rsb *r;
4210         int error;
4211
4212         error = find_lkb(ls, ms->m_remid, &lkb);
4213         if (error)
4214                 return error;
4215
4216         r = lkb->lkb_resource;
4217
4218         hold_rsb(r);
4219         lock_rsb(r);
4220
4221         error = validate_message(lkb, ms);
4222         if (error)
4223                 goto out;
4224
4225         receive_flags_reply(lkb, ms);
4226         if (is_altmode(lkb))
4227                 munge_altmode(lkb, ms);
4228         grant_lock_pc(r, lkb, ms);
4229         queue_cast(r, lkb, 0);
4230  out:
4231         unlock_rsb(r);
4232         put_rsb(r);
4233         dlm_put_lkb(lkb);
4234         return 0;
4235 }
4236
4237 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4238 {
4239         struct dlm_lkb *lkb;
4240         struct dlm_rsb *r;
4241         int error;
4242
4243         error = find_lkb(ls, ms->m_remid, &lkb);
4244         if (error)
4245                 return error;
4246
4247         r = lkb->lkb_resource;
4248
4249         hold_rsb(r);
4250         lock_rsb(r);
4251
4252         error = validate_message(lkb, ms);
4253         if (error)
4254                 goto out;
4255
4256         queue_bast(r, lkb, ms->m_bastmode);
4257         lkb->lkb_highbast = ms->m_bastmode;
4258  out:
4259         unlock_rsb(r);
4260         put_rsb(r);
4261         dlm_put_lkb(lkb);
4262         return 0;
4263 }
4264
4265 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4266 {
4267         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4268
4269         from_nodeid = ms->m_header.h_nodeid;
4270         our_nodeid = dlm_our_nodeid();
4271
4272         len = receive_extralen(ms);
4273
4274         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4275                                   &ret_nodeid, NULL);
4276
4277         /* Optimization: we're master so treat lookup as a request */
4278         if (!error && ret_nodeid == our_nodeid) {
4279                 receive_request(ls, ms);
4280                 return;
4281         }
4282         send_lookup_reply(ls, ms, ret_nodeid, error);
4283 }
4284
4285 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4286 {
4287         char name[DLM_RESNAME_MAXLEN+1];
4288         struct dlm_rsb *r;
4289         uint32_t hash, b;
4290         int rv, len, dir_nodeid, from_nodeid;
4291
4292         from_nodeid = ms->m_header.h_nodeid;
4293
4294         len = receive_extralen(ms);
4295
4296         if (len > DLM_RESNAME_MAXLEN) {
4297                 log_error(ls, "receive_remove from %d bad len %d",
4298                           from_nodeid, len);
4299                 return;
4300         }
4301
4302         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4303         if (dir_nodeid != dlm_our_nodeid()) {
4304                 log_error(ls, "receive_remove from %d bad nodeid %d",
4305                           from_nodeid, dir_nodeid);
4306                 return;
4307         }
4308
4309         /* Look for name on rsbtbl.toss, if it's there, kill it.
4310            If it's on rsbtbl.keep, it's being used, and we should ignore this
4311            message.  This is an expected race between the dir node sending a
4312            request to the master node at the same time as the master node sends
4313            a remove to the dir node.  The resolution to that race is for the
4314            dir node to ignore the remove message, and the master node to
4315            recreate the master rsb when it gets a request from the dir node for
4316            an rsb it doesn't have. */
4317
4318         memset(name, 0, sizeof(name));
4319         memcpy(name, ms->m_extra, len);
4320
4321         hash = jhash(name, len, 0);
4322         b = hash & (ls->ls_rsbtbl_size - 1);
4323
4324         spin_lock(&ls->ls_rsbtbl[b].lock);
4325
4326         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4327         if (rv) {
4328                 /* verify the rsb is on keep list per comment above */
4329                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4330                 if (rv) {
4331                         /* should not happen */
4332                         log_error(ls, "receive_remove from %d not found %s",
4333                                   from_nodeid, name);
4334                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4335                         return;
4336                 }
4337                 if (r->res_master_nodeid != from_nodeid) {
4338                         /* should not happen */
4339                         log_error(ls, "receive_remove keep from %d master %d",
4340                                   from_nodeid, r->res_master_nodeid);
4341                         dlm_print_rsb(r);
4342                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4343                         return;
4344                 }
4345
4346                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4347                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4348                           name);
4349                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4350                 return;
4351         }
4352
4353         if (r->res_master_nodeid != from_nodeid) {
4354                 log_error(ls, "receive_remove toss from %d master %d",
4355                           from_nodeid, r->res_master_nodeid);
4356                 dlm_print_rsb(r);
4357                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4358                 return;
4359         }
4360
4361         if (kref_put(&r->res_ref, kill_rsb)) {
4362                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4363                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4364                 dlm_free_rsb(r);
4365         } else {
4366                 log_error(ls, "receive_remove from %d rsb ref error",
4367                           from_nodeid);
4368                 dlm_print_rsb(r);
4369                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4370         }
4371 }
4372
4373 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4374 {
4375         do_purge(ls, ms->m_nodeid, ms->m_pid);
4376 }
4377
4378 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4379 {
4380         struct dlm_lkb *lkb;
4381         struct dlm_rsb *r;
4382         int error, mstype, result;
4383         int from_nodeid = ms->m_header.h_nodeid;
4384
4385         error = find_lkb(ls, ms->m_remid, &lkb);
4386         if (error)
4387                 return error;
4388
4389         r = lkb->lkb_resource;
4390         hold_rsb(r);
4391         lock_rsb(r);
4392
4393         error = validate_message(lkb, ms);
4394         if (error)
4395                 goto out;
4396
4397         mstype = lkb->lkb_wait_type;
4398         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4399         if (error) {
4400                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4401                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4402                 dlm_dump_rsb(r);
4403                 goto out;
4404         }
4405
4406         /* Optimization: the dir node was also the master, so it took our
4407            lookup as a request and sent request reply instead of lookup reply */
4408         if (mstype == DLM_MSG_LOOKUP) {
4409                 r->res_master_nodeid = from_nodeid;
4410                 r->res_nodeid = from_nodeid;
4411                 lkb->lkb_nodeid = from_nodeid;
4412         }
4413
4414         /* this is the value returned from do_request() on the master */
4415         result = ms->m_result;
4416
4417         switch (result) {
4418         case -EAGAIN:
4419                 /* request would block (be queued) on remote master */
4420                 queue_cast(r, lkb, -EAGAIN);
4421                 confirm_master(r, -EAGAIN);
4422                 unhold_lkb(lkb); /* undoes create_lkb() */
4423                 break;
4424
4425         case -EINPROGRESS:
4426         case 0:
4427                 /* request was queued or granted on remote master */
4428                 receive_flags_reply(lkb, ms);
4429                 lkb->lkb_remid = ms->m_lkid;
4430                 if (is_altmode(lkb))
4431                         munge_altmode(lkb, ms);
4432                 if (result) {
4433                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4434                         add_timeout(lkb);
4435                 } else {
4436                         grant_lock_pc(r, lkb, ms);
4437                         queue_cast(r, lkb, 0);
4438                 }
4439                 confirm_master(r, result);
4440                 break;
4441
4442         case -EBADR:
4443         case -ENOTBLK:
4444                 /* find_rsb failed to find rsb or rsb wasn't master */
4445                 log_limit(ls, "receive_request_reply %x from %d %d "
4446                           "master %d dir %d first %x %s", lkb->lkb_id,
4447                           from_nodeid, result, r->res_master_nodeid,
4448                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4449
4450                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4451                     r->res_master_nodeid != dlm_our_nodeid()) {
4452                         /* cause _request_lock->set_master->send_lookup */
4453                         r->res_master_nodeid = 0;
4454                         r->res_nodeid = -1;
4455                         lkb->lkb_nodeid = -1;
4456                 }
4457
4458                 if (is_overlap(lkb)) {
4459                         /* we'll ignore error in cancel/unlock reply */
4460                         queue_cast_overlap(r, lkb);
4461                         confirm_master(r, result);
4462                         unhold_lkb(lkb); /* undoes create_lkb() */
4463                 } else {
4464                         _request_lock(r, lkb);
4465
4466                         if (r->res_master_nodeid == dlm_our_nodeid())
4467                                 confirm_master(r, 0);
4468                 }
4469                 break;
4470
4471         default:
4472                 log_error(ls, "receive_request_reply %x error %d",
4473                           lkb->lkb_id, result);
4474         }
4475
4476         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4477                 log_debug(ls, "receive_request_reply %x result %d unlock",
4478                           lkb->lkb_id, result);
4479                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4480                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4481                 send_unlock(r, lkb);
4482         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4483                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4484                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4485                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4486                 send_cancel(r, lkb);
4487         } else {
4488                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4489                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4490         }
4491  out:
4492         unlock_rsb(r);
4493         put_rsb(r);
4494         dlm_put_lkb(lkb);
4495         return 0;
4496 }
4497
4498 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4499                                     struct dlm_message *ms)
4500 {
4501         /* this is the value returned from do_convert() on the master */
4502         switch (ms->m_result) {
4503         case -EAGAIN:
4504                 /* convert would block (be queued) on remote master */
4505                 queue_cast(r, lkb, -EAGAIN);
4506                 break;
4507
4508         case -EDEADLK:
4509                 receive_flags_reply(lkb, ms);
4510                 revert_lock_pc(r, lkb);
4511                 queue_cast(r, lkb, -EDEADLK);
4512                 break;
4513
4514         case -EINPROGRESS:
4515                 /* convert was queued on remote master */
4516                 receive_flags_reply(lkb, ms);
4517                 if (is_demoted(lkb))
4518                         munge_demoted(lkb);
4519                 del_lkb(r, lkb);
4520                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4521                 add_timeout(lkb);
4522                 break;
4523
4524         case 0:
4525                 /* convert was granted on remote master */
4526                 receive_flags_reply(lkb, ms);
4527                 if (is_demoted(lkb))
4528                         munge_demoted(lkb);
4529                 grant_lock_pc(r, lkb, ms);
4530                 queue_cast(r, lkb, 0);
4531                 break;
4532
4533         default:
4534                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4535                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4536                           ms->m_result);
4537                 dlm_print_rsb(r);
4538                 dlm_print_lkb(lkb);
4539         }
4540 }
4541
4542 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4543 {
4544         struct dlm_rsb *r = lkb->lkb_resource;
4545         int error;
4546
4547         hold_rsb(r);
4548         lock_rsb(r);
4549
4550         error = validate_message(lkb, ms);
4551         if (error)
4552                 goto out;
4553
4554         /* stub reply can happen with waiters_mutex held */
4555         error = remove_from_waiters_ms(lkb, ms);
4556         if (error)
4557                 goto out;
4558
4559         __receive_convert_reply(r, lkb, ms);
4560  out:
4561         unlock_rsb(r);
4562         put_rsb(r);
4563 }
4564
4565 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4566 {
4567         struct dlm_lkb *lkb;
4568         int error;
4569
4570         error = find_lkb(ls, ms->m_remid, &lkb);
4571         if (error)
4572                 return error;
4573
4574         _receive_convert_reply(lkb, ms);
4575         dlm_put_lkb(lkb);
4576         return 0;
4577 }
4578
4579 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4580 {
4581         struct dlm_rsb *r = lkb->lkb_resource;
4582         int error;
4583
4584         hold_rsb(r);
4585         lock_rsb(r);
4586
4587         error = validate_message(lkb, ms);
4588         if (error)
4589                 goto out;
4590
4591         /* stub reply can happen with waiters_mutex held */
4592         error = remove_from_waiters_ms(lkb, ms);
4593         if (error)
4594                 goto out;
4595
4596         /* this is the value returned from do_unlock() on the master */
4597
4598         switch (ms->m_result) {
4599         case -DLM_EUNLOCK:
4600                 receive_flags_reply(lkb, ms);
4601                 remove_lock_pc(r, lkb);
4602                 queue_cast(r, lkb, -DLM_EUNLOCK);
4603                 break;
4604         case -ENOENT:
4605                 break;
4606         default:
4607                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4608                           lkb->lkb_id, ms->m_result);
4609         }
4610  out:
4611         unlock_rsb(r);
4612         put_rsb(r);
4613 }
4614
4615 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4616 {
4617         struct dlm_lkb *lkb;
4618         int error;
4619
4620         error = find_lkb(ls, ms->m_remid, &lkb);
4621         if (error)
4622                 return error;
4623
4624         _receive_unlock_reply(lkb, ms);
4625         dlm_put_lkb(lkb);
4626         return 0;
4627 }
4628
4629 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4630 {
4631         struct dlm_rsb *r = lkb->lkb_resource;
4632         int error;
4633
4634         hold_rsb(r);
4635         lock_rsb(r);
4636
4637         error = validate_message(lkb, ms);
4638         if (error)
4639                 goto out;
4640
4641         /* stub reply can happen with waiters_mutex held */
4642         error = remove_from_waiters_ms(lkb, ms);
4643         if (error)
4644                 goto out;
4645
4646         /* this is the value returned from do_cancel() on the master */
4647
4648         switch (ms->m_result) {
4649         case -DLM_ECANCEL:
4650                 receive_flags_reply(lkb, ms);
4651                 revert_lock_pc(r, lkb);
4652                 queue_cast(r, lkb, -DLM_ECANCEL);
4653                 break;
4654         case 0:
4655                 break;
4656         default:
4657                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4658                           lkb->lkb_id, ms->m_result);
4659         }
4660  out:
4661         unlock_rsb(r);
4662         put_rsb(r);
4663 }
4664
4665 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4666 {
4667         struct dlm_lkb *lkb;
4668         int error;
4669
4670         error = find_lkb(ls, ms->m_remid, &lkb);
4671         if (error)
4672                 return error;
4673
4674         _receive_cancel_reply(lkb, ms);
4675         dlm_put_lkb(lkb);
4676         return 0;
4677 }
4678
4679 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4680 {
4681         struct dlm_lkb *lkb;
4682         struct dlm_rsb *r;
4683         int error, ret_nodeid;
4684         int do_lookup_list = 0;
4685
4686         error = find_lkb(ls, ms->m_lkid, &lkb);
4687         if (error) {
4688                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4689                 return;
4690         }
4691
4692         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4693            FIXME: will a non-zero error ever be returned? */
4694
4695         r = lkb->lkb_resource;
4696         hold_rsb(r);
4697         lock_rsb(r);
4698
4699         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4700         if (error)
4701                 goto out;
4702
4703         ret_nodeid = ms->m_nodeid;
4704
4705         /* We sometimes receive a request from the dir node for this
4706            rsb before we've received the dir node's loookup_reply for it.
4707            The request from the dir node implies we're the master, so we set
4708            ourself as master in receive_request_reply, and verify here that
4709            we are indeed the master. */
4710
4711         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4712                 /* This should never happen */
4713                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4714                           "master %d dir %d our %d first %x %s",
4715                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4716                           r->res_master_nodeid, r->res_dir_nodeid,
4717                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4718         }
4719
4720         if (ret_nodeid == dlm_our_nodeid()) {
4721                 r->res_master_nodeid = ret_nodeid;
4722                 r->res_nodeid = 0;
4723                 do_lookup_list = 1;
4724                 r->res_first_lkid = 0;
4725         } else if (ret_nodeid == -1) {
4726                 /* the remote node doesn't believe it's the dir node */
4727                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4728                           lkb->lkb_id, ms->m_header.h_nodeid);
4729                 r->res_master_nodeid = 0;
4730                 r->res_nodeid = -1;
4731                 lkb->lkb_nodeid = -1;
4732         } else {
4733                 /* set_master() will set lkb_nodeid from r */
4734                 r->res_master_nodeid = ret_nodeid;
4735                 r->res_nodeid = ret_nodeid;
4736         }
4737
4738         if (is_overlap(lkb)) {
4739                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4740                           lkb->lkb_id, lkb->lkb_flags);
4741                 queue_cast_overlap(r, lkb);
4742                 unhold_lkb(lkb); /* undoes create_lkb() */
4743                 goto out_list;
4744         }
4745
4746         _request_lock(r, lkb);
4747
4748  out_list:
4749         if (do_lookup_list)
4750                 process_lookup_list(r);
4751  out:
4752         unlock_rsb(r);
4753         put_rsb(r);
4754         dlm_put_lkb(lkb);
4755 }
4756
4757 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4758                              uint32_t saved_seq)
4759 {
4760         int error = 0, noent = 0;
4761
4762         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4763                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4764                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4765                           ms->m_remid, ms->m_result);
4766                 return;
4767         }
4768
4769         switch (ms->m_type) {
4770
4771         /* messages sent to a master node */
4772
4773         case DLM_MSG_REQUEST:
4774                 error = receive_request(ls, ms);
4775                 break;
4776
4777         case DLM_MSG_CONVERT:
4778                 error = receive_convert(ls, ms);
4779                 break;
4780
4781         case DLM_MSG_UNLOCK:
4782                 error = receive_unlock(ls, ms);
4783                 break;
4784
4785         case DLM_MSG_CANCEL:
4786                 noent = 1;
4787                 error = receive_cancel(ls, ms);
4788                 break;
4789
4790         /* messages sent from a master node (replies to above) */
4791
4792         case DLM_MSG_REQUEST_REPLY:
4793                 error = receive_request_reply(ls, ms);
4794                 break;
4795
4796         case DLM_MSG_CONVERT_REPLY:
4797                 error = receive_convert_reply(ls, ms);
4798                 break;
4799
4800         case DLM_MSG_UNLOCK_REPLY:
4801                 error = receive_unlock_reply(ls, ms);
4802                 break;
4803
4804         case DLM_MSG_CANCEL_REPLY:
4805                 error = receive_cancel_reply(ls, ms);
4806                 break;
4807
4808         /* messages sent from a master node (only two types of async msg) */
4809
4810         case DLM_MSG_GRANT:
4811                 noent = 1;
4812                 error = receive_grant(ls, ms);
4813                 break;
4814
4815         case DLM_MSG_BAST:
4816                 noent = 1;
4817                 error = receive_bast(ls, ms);
4818                 break;
4819
4820         /* messages sent to a dir node */
4821
4822         case DLM_MSG_LOOKUP:
4823                 receive_lookup(ls, ms);
4824                 break;
4825
4826         case DLM_MSG_REMOVE:
4827                 receive_remove(ls, ms);
4828                 break;
4829
4830         /* messages sent from a dir node (remove has no reply) */
4831
4832         case DLM_MSG_LOOKUP_REPLY:
4833                 receive_lookup_reply(ls, ms);
4834                 break;
4835
4836         /* other messages */
4837
4838         case DLM_MSG_PURGE:
4839                 receive_purge(ls, ms);
4840                 break;
4841
4842         default:
4843                 log_error(ls, "unknown message type %d", ms->m_type);
4844         }
4845
4846         /*
4847          * When checking for ENOENT, we're checking the result of
4848          * find_lkb(m_remid):
4849          *
4850          * The lock id referenced in the message wasn't found.  This may
4851          * happen in normal usage for the async messages and cancel, so
4852          * only use log_debug for them.
4853          *
4854          * Some errors are expected and normal.
4855          */
4856
4857         if (error == -ENOENT && noent) {
4858                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4859                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4860                           ms->m_lkid, saved_seq);
4861         } else if (error == -ENOENT) {
4862                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4863                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4864                           ms->m_lkid, saved_seq);
4865
4866                 if (ms->m_type == DLM_MSG_CONVERT)
4867                         dlm_dump_rsb_hash(ls, ms->m_hash);
4868         }
4869
4870         if (error == -EINVAL) {
4871                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4872                           "saved_seq %u",
4873                           ms->m_type, ms->m_header.h_nodeid,
4874                           ms->m_lkid, ms->m_remid, saved_seq);
4875         }
4876 }
4877
4878 /* If the lockspace is in recovery mode (locking stopped), then normal
4879    messages are saved on the requestqueue for processing after recovery is
4880    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4881    messages off the requestqueue before we process new ones. This occurs right
4882    after recovery completes when we transition from saving all messages on
4883    requestqueue, to processing all the saved messages, to processing new
4884    messages as they arrive. */
4885
4886 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4887                                 int nodeid)
4888 {
4889         if (dlm_locking_stopped(ls)) {
4890                 /* If we were a member of this lockspace, left, and rejoined,
4891                    other nodes may still be sending us messages from the
4892                    lockspace generation before we left. */
4893                 if (!ls->ls_generation) {
4894                         log_limit(ls, "receive %d from %d ignore old gen",
4895                                   ms->m_type, nodeid);
4896                         return;
4897                 }
4898
4899                 dlm_add_requestqueue(ls, nodeid, ms);
4900         } else {
4901                 dlm_wait_requestqueue(ls);
4902                 _receive_message(ls, ms, 0);
4903         }
4904 }
4905
4906 /* This is called by dlm_recoverd to process messages that were saved on
4907    the requestqueue. */
4908
4909 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4910                                uint32_t saved_seq)
4911 {
4912         _receive_message(ls, ms, saved_seq);
4913 }
4914
4915 /* This is called by the midcomms layer when something is received for
4916    the lockspace.  It could be either a MSG (normal message sent as part of
4917    standard locking activity) or an RCOM (recovery message sent as part of
4918    lockspace recovery). */
4919
4920 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4921 {
4922         struct dlm_header *hd = &p->header;
4923         struct dlm_ls *ls;
4924         int type = 0;
4925
4926         switch (hd->h_cmd) {
4927         case DLM_MSG:
4928                 dlm_message_in(&p->message);
4929                 type = p->message.m_type;
4930                 break;
4931         case DLM_RCOM:
4932                 dlm_rcom_in(&p->rcom);
4933                 type = p->rcom.rc_type;
4934                 break;
4935         default:
4936                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4937                 return;
4938         }
4939
4940         if (hd->h_nodeid != nodeid) {
4941                 log_print("invalid h_nodeid %d from %d lockspace %x",
4942                           hd->h_nodeid, nodeid, hd->h_lockspace);
4943                 return;
4944         }
4945
4946         ls = dlm_find_lockspace_global(hd->h_lockspace);
4947         if (!ls) {
4948                 if (dlm_config.ci_log_debug) {
4949                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4950                                 "%u from %d cmd %d type %d\n",
4951                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
4952                 }
4953
4954                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4955                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4956                 return;
4957         }
4958
4959         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4960            be inactive (in this ls) before transitioning to recovery mode */
4961
4962         down_read(&ls->ls_recv_active);
4963         if (hd->h_cmd == DLM_MSG)
4964                 dlm_receive_message(ls, &p->message, nodeid);
4965         else
4966                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4967         up_read(&ls->ls_recv_active);
4968
4969         dlm_put_lockspace(ls);
4970 }
4971
4972 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4973                                    struct dlm_message *ms_stub)
4974 {
4975         if (middle_conversion(lkb)) {
4976                 hold_lkb(lkb);
4977                 memset(ms_stub, 0, sizeof(struct dlm_message));
4978                 ms_stub->m_flags = DLM_IFL_STUB_MS;
4979                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4980                 ms_stub->m_result = -EINPROGRESS;
4981                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4982                 _receive_convert_reply(lkb, ms_stub);
4983
4984                 /* Same special case as in receive_rcom_lock_args() */
4985                 lkb->lkb_grmode = DLM_LOCK_IV;
4986                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4987                 unhold_lkb(lkb);
4988
4989         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4990                 lkb->lkb_flags |= DLM_IFL_RESEND;
4991         }
4992
4993         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4994            conversions are async; there's no reply from the remote master */
4995 }
4996
4997 /* A waiting lkb needs recovery if the master node has failed, or
4998    the master node is changing (only when no directory is used) */
4999
5000 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5001                                  int dir_nodeid)
5002 {
5003         if (dlm_no_directory(ls))
5004                 return 1;
5005
5006         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5007                 return 1;
5008
5009         return 0;
5010 }
5011
5012 /* Recovery for locks that are waiting for replies from nodes that are now
5013    gone.  We can just complete unlocks and cancels by faking a reply from the
5014    dead node.  Requests and up-conversions we flag to be resent after
5015    recovery.  Down-conversions can just be completed with a fake reply like
5016    unlocks.  Conversions between PR and CW need special attention. */
5017
5018 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5019 {
5020         struct dlm_lkb *lkb, *safe;
5021         struct dlm_message *ms_stub;
5022         int wait_type, stub_unlock_result, stub_cancel_result;
5023         int dir_nodeid;
5024
5025         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
5026         if (!ms_stub) {
5027                 log_error(ls, "dlm_recover_waiters_pre no mem");
5028                 return;
5029         }
5030
5031         mutex_lock(&ls->ls_waiters_mutex);
5032
5033         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5034
5035                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5036
5037                 /* exclude debug messages about unlocks because there can be so
5038                    many and they aren't very interesting */
5039
5040                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5041                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5042                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5043                                   lkb->lkb_id,
5044                                   lkb->lkb_remid,
5045                                   lkb->lkb_wait_type,
5046                                   lkb->lkb_resource->res_nodeid,
5047                                   lkb->lkb_nodeid,
5048                                   lkb->lkb_wait_nodeid,
5049                                   dir_nodeid);
5050                 }
5051
5052                 /* all outstanding lookups, regardless of destination  will be
5053                    resent after recovery is done */
5054
5055                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5056                         lkb->lkb_flags |= DLM_IFL_RESEND;
5057                         continue;
5058                 }
5059
5060                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5061                         continue;
5062
5063                 wait_type = lkb->lkb_wait_type;
5064                 stub_unlock_result = -DLM_EUNLOCK;
5065                 stub_cancel_result = -DLM_ECANCEL;
5066
5067                 /* Main reply may have been received leaving a zero wait_type,
5068                    but a reply for the overlapping op may not have been
5069                    received.  In that case we need to fake the appropriate
5070                    reply for the overlap op. */
5071
5072                 if (!wait_type) {
5073                         if (is_overlap_cancel(lkb)) {
5074                                 wait_type = DLM_MSG_CANCEL;
5075                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5076                                         stub_cancel_result = 0;
5077                         }
5078                         if (is_overlap_unlock(lkb)) {
5079                                 wait_type = DLM_MSG_UNLOCK;
5080                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5081                                         stub_unlock_result = -ENOENT;
5082                         }
5083
5084                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5085                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
5086                                   stub_cancel_result, stub_unlock_result);
5087                 }
5088
5089                 switch (wait_type) {
5090
5091                 case DLM_MSG_REQUEST:
5092                         lkb->lkb_flags |= DLM_IFL_RESEND;
5093                         break;
5094
5095                 case DLM_MSG_CONVERT:
5096                         recover_convert_waiter(ls, lkb, ms_stub);
5097                         break;
5098
5099                 case DLM_MSG_UNLOCK:
5100                         hold_lkb(lkb);
5101                         memset(ms_stub, 0, sizeof(struct dlm_message));
5102                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5103                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5104                         ms_stub->m_result = stub_unlock_result;
5105                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5106                         _receive_unlock_reply(lkb, ms_stub);
5107                         dlm_put_lkb(lkb);
5108                         break;
5109
5110                 case DLM_MSG_CANCEL:
5111                         hold_lkb(lkb);
5112                         memset(ms_stub, 0, sizeof(struct dlm_message));
5113                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5114                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5115                         ms_stub->m_result = stub_cancel_result;
5116                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5117                         _receive_cancel_reply(lkb, ms_stub);
5118                         dlm_put_lkb(lkb);
5119                         break;
5120
5121                 default:
5122                         log_error(ls, "invalid lkb wait_type %d %d",
5123                                   lkb->lkb_wait_type, wait_type);
5124                 }
5125                 schedule();
5126         }
5127         mutex_unlock(&ls->ls_waiters_mutex);
5128         kfree(ms_stub);
5129 }
5130
5131 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5132 {
5133         struct dlm_lkb *lkb;
5134         int found = 0;
5135
5136         mutex_lock(&ls->ls_waiters_mutex);
5137         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5138                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5139                         hold_lkb(lkb);
5140                         found = 1;
5141                         break;
5142                 }
5143         }
5144         mutex_unlock(&ls->ls_waiters_mutex);
5145
5146         if (!found)
5147                 lkb = NULL;
5148         return lkb;
5149 }
5150
5151 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5152    master or dir-node for r.  Processing the lkb may result in it being placed
5153    back on waiters. */
5154
5155 /* We do this after normal locking has been enabled and any saved messages
5156    (in requestqueue) have been processed.  We should be confident that at
5157    this point we won't get or process a reply to any of these waiting
5158    operations.  But, new ops may be coming in on the rsbs/locks here from
5159    userspace or remotely. */
5160
5161 /* there may have been an overlap unlock/cancel prior to recovery or after
5162    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5163    overlap flag would just have been set and nothing new sent.  we can be
5164    confident here than any replies to either the initial op or overlap ops
5165    prior to recovery have been received. */
5166
5167 int dlm_recover_waiters_post(struct dlm_ls *ls)
5168 {
5169         struct dlm_lkb *lkb;
5170         struct dlm_rsb *r;
5171         int error = 0, mstype, err, oc, ou;
5172
5173         while (1) {
5174                 if (dlm_locking_stopped(ls)) {
5175                         log_debug(ls, "recover_waiters_post aborted");
5176                         error = -EINTR;
5177                         break;
5178                 }
5179
5180                 lkb = find_resend_waiter(ls);
5181                 if (!lkb)
5182                         break;
5183
5184                 r = lkb->lkb_resource;
5185                 hold_rsb(r);
5186                 lock_rsb(r);
5187
5188                 mstype = lkb->lkb_wait_type;
5189                 oc = is_overlap_cancel(lkb);
5190                 ou = is_overlap_unlock(lkb);
5191                 err = 0;
5192
5193                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5194                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5195                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5196                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5197                           dlm_dir_nodeid(r), oc, ou);
5198
5199                 /* At this point we assume that we won't get a reply to any
5200                    previous op or overlap op on this lock.  First, do a big
5201                    remove_from_waiters() for all previous ops. */
5202
5203                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5204                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5205                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5206                 lkb->lkb_wait_type = 0;
5207                 lkb->lkb_wait_count = 0;
5208                 mutex_lock(&ls->ls_waiters_mutex);
5209                 list_del_init(&lkb->lkb_wait_reply);
5210                 mutex_unlock(&ls->ls_waiters_mutex);
5211                 unhold_lkb(lkb); /* for waiters list */
5212
5213                 if (oc || ou) {
5214                         /* do an unlock or cancel instead of resending */
5215                         switch (mstype) {
5216                         case DLM_MSG_LOOKUP:
5217                         case DLM_MSG_REQUEST:
5218                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5219                                                         -DLM_ECANCEL);
5220                                 unhold_lkb(lkb); /* undoes create_lkb() */
5221                                 break;
5222                         case DLM_MSG_CONVERT:
5223                                 if (oc) {
5224                                         queue_cast(r, lkb, -DLM_ECANCEL);
5225                                 } else {
5226                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5227                                         _unlock_lock(r, lkb);
5228                                 }
5229                                 break;
5230                         default:
5231                                 err = 1;
5232                         }
5233                 } else {
5234                         switch (mstype) {
5235                         case DLM_MSG_LOOKUP:
5236                         case DLM_MSG_REQUEST:
5237                                 _request_lock(r, lkb);
5238                                 if (is_master(r))
5239                                         confirm_master(r, 0);
5240                                 break;
5241                         case DLM_MSG_CONVERT:
5242                                 _convert_lock(r, lkb);
5243                                 break;
5244                         default:
5245                                 err = 1;
5246                         }
5247                 }
5248
5249                 if (err) {
5250                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5251                                   "dir_nodeid %d overlap %d %d",
5252                                   lkb->lkb_id, mstype, r->res_nodeid,
5253                                   dlm_dir_nodeid(r), oc, ou);
5254                 }
5255                 unlock_rsb(r);
5256                 put_rsb(r);
5257                 dlm_put_lkb(lkb);
5258         }
5259
5260         return error;
5261 }
5262
5263 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5264                               struct list_head *list)
5265 {
5266         struct dlm_lkb *lkb, *safe;
5267
5268         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5269                 if (!is_master_copy(lkb))
5270                         continue;
5271
5272                 /* don't purge lkbs we've added in recover_master_copy for
5273                    the current recovery seq */
5274
5275                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5276                         continue;
5277
5278                 del_lkb(r, lkb);
5279
5280                 /* this put should free the lkb */
5281                 if (!dlm_put_lkb(lkb))
5282                         log_error(ls, "purged mstcpy lkb not released");
5283         }
5284 }
5285
5286 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5287 {
5288         struct dlm_ls *ls = r->res_ls;
5289
5290         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5291         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5292         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5293 }
5294
5295 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5296                             struct list_head *list,
5297                             int nodeid_gone, unsigned int *count)
5298 {
5299         struct dlm_lkb *lkb, *safe;
5300
5301         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5302                 if (!is_master_copy(lkb))
5303                         continue;
5304
5305                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5306                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5307
5308                         del_lkb(r, lkb);
5309
5310                         /* this put should free the lkb */
5311                         if (!dlm_put_lkb(lkb))
5312                                 log_error(ls, "purged dead lkb not released");
5313
5314                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5315
5316                         (*count)++;
5317                 }
5318         }
5319 }
5320
5321 /* Get rid of locks held by nodes that are gone. */
5322
5323 void dlm_recover_purge(struct dlm_ls *ls)
5324 {
5325         struct dlm_rsb *r;
5326         struct dlm_member *memb;
5327         int nodes_count = 0;
5328         int nodeid_gone = 0;
5329         unsigned int lkb_count = 0;
5330
5331         /* cache one removed nodeid to optimize the common
5332            case of a single node removed */
5333
5334         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5335                 nodes_count++;
5336                 nodeid_gone = memb->nodeid;
5337         }
5338
5339         if (!nodes_count)
5340                 return;
5341
5342         down_write(&ls->ls_root_sem);
5343         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5344                 hold_rsb(r);
5345                 lock_rsb(r);
5346                 if (is_master(r)) {
5347                         purge_dead_list(ls, r, &r->res_grantqueue,
5348                                         nodeid_gone, &lkb_count);
5349                         purge_dead_list(ls, r, &r->res_convertqueue,
5350                                         nodeid_gone, &lkb_count);
5351                         purge_dead_list(ls, r, &r->res_waitqueue,
5352                                         nodeid_gone, &lkb_count);
5353                 }
5354                 unlock_rsb(r);
5355                 unhold_rsb(r);
5356                 cond_resched();
5357         }
5358         up_write(&ls->ls_root_sem);
5359
5360         if (lkb_count)
5361                 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
5362                           lkb_count, nodes_count);
5363 }
5364
5365 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5366 {
5367         struct rb_node *n;
5368         struct dlm_rsb *r;
5369
5370         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5371         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5372                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5373
5374                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5375                         continue;
5376                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5377                 if (!is_master(r))
5378                         continue;
5379                 hold_rsb(r);
5380                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5381                 return r;
5382         }
5383         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5384         return NULL;
5385 }
5386
5387 /*
5388  * Attempt to grant locks on resources that we are the master of.
5389  * Locks may have become grantable during recovery because locks
5390  * from departed nodes have been purged (or not rebuilt), allowing
5391  * previously blocked locks to now be granted.  The subset of rsb's
5392  * we are interested in are those with lkb's on either the convert or
5393  * waiting queues.
5394  *
5395  * Simplest would be to go through each master rsb and check for non-empty
5396  * convert or waiting queues, and attempt to grant on those rsbs.
5397  * Checking the queues requires lock_rsb, though, for which we'd need
5398  * to release the rsbtbl lock.  This would make iterating through all
5399  * rsb's very inefficient.  So, we rely on earlier recovery routines
5400  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5401  * locks for.
5402  */
5403
5404 void dlm_recover_grant(struct dlm_ls *ls)
5405 {
5406         struct dlm_rsb *r;
5407         int bucket = 0;
5408         unsigned int count = 0;
5409         unsigned int rsb_count = 0;
5410         unsigned int lkb_count = 0;
5411
5412         while (1) {
5413                 r = find_grant_rsb(ls, bucket);
5414                 if (!r) {
5415                         if (bucket == ls->ls_rsbtbl_size - 1)
5416                                 break;
5417                         bucket++;
5418                         continue;
5419                 }
5420                 rsb_count++;
5421                 count = 0;
5422                 lock_rsb(r);
5423                 grant_pending_locks(r, &count);
5424                 lkb_count += count;
5425                 confirm_master(r, 0);
5426                 unlock_rsb(r);
5427                 put_rsb(r);
5428                 cond_resched();
5429         }
5430
5431         if (lkb_count)
5432                 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
5433                           lkb_count, rsb_count);
5434 }
5435
5436 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5437                                          uint32_t remid)
5438 {
5439         struct dlm_lkb *lkb;
5440
5441         list_for_each_entry(lkb, head, lkb_statequeue) {
5442                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5443                         return lkb;
5444         }
5445         return NULL;
5446 }
5447
5448 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5449                                     uint32_t remid)
5450 {
5451         struct dlm_lkb *lkb;
5452
5453         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5454         if (lkb)
5455                 return lkb;
5456         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5457         if (lkb)
5458                 return lkb;
5459         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5460         if (lkb)
5461                 return lkb;
5462         return NULL;
5463 }
5464
5465 /* needs at least dlm_rcom + rcom_lock */
5466 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5467                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5468 {
5469         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5470
5471         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5472         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5473         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5474         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5475         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5476         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5477         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5478         lkb->lkb_rqmode = rl->rl_rqmode;
5479         lkb->lkb_grmode = rl->rl_grmode;
5480         /* don't set lkb_status because add_lkb wants to itself */
5481
5482         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5483         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5484
5485         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5486                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5487                          sizeof(struct rcom_lock);
5488                 if (lvblen > ls->ls_lvblen)
5489                         return -EINVAL;
5490                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5491                 if (!lkb->lkb_lvbptr)
5492                         return -ENOMEM;
5493                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5494         }
5495
5496         /* Conversions between PR and CW (middle modes) need special handling.
5497            The real granted mode of these converting locks cannot be determined
5498            until all locks have been rebuilt on the rsb (recover_conversion) */
5499
5500         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5501             middle_conversion(lkb)) {
5502                 rl->rl_status = DLM_LKSTS_CONVERT;
5503                 lkb->lkb_grmode = DLM_LOCK_IV;
5504                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5505         }
5506
5507         return 0;
5508 }
5509
5510 /* This lkb may have been recovered in a previous aborted recovery so we need
5511    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5512    If so we just send back a standard reply.  If not, we create a new lkb with
5513    the given values and send back our lkid.  We send back our lkid by sending
5514    back the rcom_lock struct we got but with the remid field filled in. */
5515
5516 /* needs at least dlm_rcom + rcom_lock */
5517 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5518 {
5519         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5520         struct dlm_rsb *r;
5521         struct dlm_lkb *lkb;
5522         uint32_t remid = 0;
5523         int from_nodeid = rc->rc_header.h_nodeid;
5524         int error;
5525
5526         if (rl->rl_parent_lkid) {
5527                 error = -EOPNOTSUPP;
5528                 goto out;
5529         }
5530
5531         remid = le32_to_cpu(rl->rl_lkid);
5532
5533         /* In general we expect the rsb returned to be R_MASTER, but we don't
5534            have to require it.  Recovery of masters on one node can overlap
5535            recovery of locks on another node, so one node can send us MSTCPY
5536            locks before we've made ourselves master of this rsb.  We can still
5537            add new MSTCPY locks that we receive here without any harm; when
5538            we make ourselves master, dlm_recover_masters() won't touch the
5539            MSTCPY locks we've received early. */
5540
5541         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5542                          from_nodeid, R_RECEIVE_RECOVER, &r);
5543         if (error)
5544                 goto out;
5545
5546         lock_rsb(r);
5547
5548         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5549                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5550                           from_nodeid, remid);
5551                 error = -EBADR;
5552                 goto out_unlock;
5553         }
5554
5555         lkb = search_remid(r, from_nodeid, remid);
5556         if (lkb) {
5557                 error = -EEXIST;
5558                 goto out_remid;
5559         }
5560
5561         error = create_lkb(ls, &lkb);
5562         if (error)
5563                 goto out_unlock;
5564
5565         error = receive_rcom_lock_args(ls, lkb, r, rc);
5566         if (error) {
5567                 __put_lkb(ls, lkb);
5568                 goto out_unlock;
5569         }
5570
5571         attach_lkb(r, lkb);
5572         add_lkb(r, lkb, rl->rl_status);
5573         error = 0;
5574         ls->ls_recover_locks_in++;
5575
5576         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5577                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5578
5579  out_remid:
5580         /* this is the new value returned to the lock holder for
5581            saving in its process-copy lkb */
5582         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5583
5584         lkb->lkb_recover_seq = ls->ls_recover_seq;
5585
5586  out_unlock:
5587         unlock_rsb(r);
5588         put_rsb(r);
5589  out:
5590         if (error && error != -EEXIST)
5591                 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
5592                           from_nodeid, remid, error);
5593         rl->rl_result = cpu_to_le32(error);
5594         return error;
5595 }
5596
5597 /* needs at least dlm_rcom + rcom_lock */
5598 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5599 {
5600         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5601         struct dlm_rsb *r;
5602         struct dlm_lkb *lkb;
5603         uint32_t lkid, remid;
5604         int error, result;
5605
5606         lkid = le32_to_cpu(rl->rl_lkid);
5607         remid = le32_to_cpu(rl->rl_remid);
5608         result = le32_to_cpu(rl->rl_result);
5609
5610         error = find_lkb(ls, lkid, &lkb);
5611         if (error) {
5612                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5613                           lkid, rc->rc_header.h_nodeid, remid, result);
5614                 return error;
5615         }
5616
5617         r = lkb->lkb_resource;
5618         hold_rsb(r);
5619         lock_rsb(r);
5620
5621         if (!is_process_copy(lkb)) {
5622                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5623                           lkid, rc->rc_header.h_nodeid, remid, result);
5624                 dlm_dump_rsb(r);
5625                 unlock_rsb(r);
5626                 put_rsb(r);
5627                 dlm_put_lkb(lkb);
5628                 return -EINVAL;
5629         }
5630
5631         switch (result) {
5632         case -EBADR:
5633                 /* There's a chance the new master received our lock before
5634                    dlm_recover_master_reply(), this wouldn't happen if we did
5635                    a barrier between recover_masters and recover_locks. */
5636
5637                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5638                           lkid, rc->rc_header.h_nodeid, remid, result);
5639         
5640                 dlm_send_rcom_lock(r, lkb);
5641                 goto out;
5642         case -EEXIST:
5643         case 0:
5644                 lkb->lkb_remid = remid;
5645                 break;
5646         default:
5647                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5648                           lkid, rc->rc_header.h_nodeid, remid, result);
5649         }
5650
5651         /* an ack for dlm_recover_locks() which waits for replies from
5652            all the locks it sends to new masters */
5653         dlm_recovered_lock(r);
5654  out:
5655         unlock_rsb(r);
5656         put_rsb(r);
5657         dlm_put_lkb(lkb);
5658
5659         return 0;
5660 }
5661
5662 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5663                      int mode, uint32_t flags, void *name, unsigned int namelen,
5664                      unsigned long timeout_cs)
5665 {
5666         struct dlm_lkb *lkb;
5667         struct dlm_args args;
5668         int error;
5669
5670         dlm_lock_recovery(ls);
5671
5672         error = create_lkb(ls, &lkb);
5673         if (error) {
5674                 kfree(ua);
5675                 goto out;
5676         }
5677
5678         if (flags & DLM_LKF_VALBLK) {
5679                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5680                 if (!ua->lksb.sb_lvbptr) {
5681                         kfree(ua);
5682                         __put_lkb(ls, lkb);
5683                         error = -ENOMEM;
5684                         goto out;
5685                 }
5686         }
5687
5688         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5689            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5690            lock and that lkb_astparam is the dlm_user_args structure. */
5691
5692         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5693                               fake_astfn, ua, fake_bastfn, &args);
5694         lkb->lkb_flags |= DLM_IFL_USER;
5695
5696         if (error) {
5697                 __put_lkb(ls, lkb);
5698                 goto out;
5699         }
5700
5701         error = request_lock(ls, lkb, name, namelen, &args);
5702
5703         switch (error) {
5704         case 0:
5705                 break;
5706         case -EINPROGRESS:
5707                 error = 0;
5708                 break;
5709         case -EAGAIN:
5710                 error = 0;
5711                 /* fall through */
5712         default:
5713                 __put_lkb(ls, lkb);
5714                 goto out;
5715         }
5716
5717         /* add this new lkb to the per-process list of locks */
5718         spin_lock(&ua->proc->locks_spin);
5719         hold_lkb(lkb);
5720         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5721         spin_unlock(&ua->proc->locks_spin);
5722  out:
5723         dlm_unlock_recovery(ls);
5724         return error;
5725 }
5726
5727 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5728                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5729                      unsigned long timeout_cs)
5730 {
5731         struct dlm_lkb *lkb;
5732         struct dlm_args args;
5733         struct dlm_user_args *ua;
5734         int error;
5735
5736         dlm_lock_recovery(ls);
5737
5738         error = find_lkb(ls, lkid, &lkb);
5739         if (error)
5740                 goto out;
5741
5742         /* user can change the params on its lock when it converts it, or
5743            add an lvb that didn't exist before */
5744
5745         ua = lkb->lkb_ua;
5746
5747         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5748                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5749                 if (!ua->lksb.sb_lvbptr) {
5750                         error = -ENOMEM;
5751                         goto out_put;
5752                 }
5753         }
5754         if (lvb_in && ua->lksb.sb_lvbptr)
5755                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5756
5757         ua->xid = ua_tmp->xid;
5758         ua->castparam = ua_tmp->castparam;
5759         ua->castaddr = ua_tmp->castaddr;
5760         ua->bastparam = ua_tmp->bastparam;
5761         ua->bastaddr = ua_tmp->bastaddr;
5762         ua->user_lksb = ua_tmp->user_lksb;
5763
5764         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5765                               fake_astfn, ua, fake_bastfn, &args);
5766         if (error)
5767                 goto out_put;
5768
5769         error = convert_lock(ls, lkb, &args);
5770
5771         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5772                 error = 0;
5773  out_put:
5774         dlm_put_lkb(lkb);
5775  out:
5776         dlm_unlock_recovery(ls);
5777         kfree(ua_tmp);
5778         return error;
5779 }
5780
5781 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5782                     uint32_t flags, uint32_t lkid, char *lvb_in)
5783 {
5784         struct dlm_lkb *lkb;
5785         struct dlm_args args;
5786         struct dlm_user_args *ua;
5787         int error;
5788
5789         dlm_lock_recovery(ls);
5790
5791         error = find_lkb(ls, lkid, &lkb);
5792         if (error)
5793                 goto out;
5794
5795         ua = lkb->lkb_ua;
5796
5797         if (lvb_in && ua->lksb.sb_lvbptr)
5798                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5799         if (ua_tmp->castparam)
5800                 ua->castparam = ua_tmp->castparam;
5801         ua->user_lksb = ua_tmp->user_lksb;
5802
5803         error = set_unlock_args(flags, ua, &args);
5804         if (error)
5805                 goto out_put;
5806
5807         error = unlock_lock(ls, lkb, &args);
5808
5809         if (error == -DLM_EUNLOCK)
5810                 error = 0;
5811         /* from validate_unlock_args() */
5812         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5813                 error = 0;
5814         if (error)
5815                 goto out_put;
5816
5817         spin_lock(&ua->proc->locks_spin);
5818         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5819         if (!list_empty(&lkb->lkb_ownqueue))
5820                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5821         spin_unlock(&ua->proc->locks_spin);
5822  out_put:
5823         dlm_put_lkb(lkb);
5824  out:
5825         dlm_unlock_recovery(ls);
5826         kfree(ua_tmp);
5827         return error;
5828 }
5829
5830 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5831                     uint32_t flags, uint32_t lkid)
5832 {
5833         struct dlm_lkb *lkb;
5834         struct dlm_args args;
5835         struct dlm_user_args *ua;
5836         int error;
5837
5838         dlm_lock_recovery(ls);
5839
5840         error = find_lkb(ls, lkid, &lkb);
5841         if (error)
5842                 goto out;
5843
5844         ua = lkb->lkb_ua;
5845         if (ua_tmp->castparam)
5846                 ua->castparam = ua_tmp->castparam;
5847         ua->user_lksb = ua_tmp->user_lksb;
5848
5849         error = set_unlock_args(flags, ua, &args);
5850         if (error)
5851                 goto out_put;
5852
5853         error = cancel_lock(ls, lkb, &args);
5854
5855         if (error == -DLM_ECANCEL)
5856                 error = 0;
5857         /* from validate_unlock_args() */
5858         if (error == -EBUSY)
5859                 error = 0;
5860  out_put:
5861         dlm_put_lkb(lkb);
5862  out:
5863         dlm_unlock_recovery(ls);
5864         kfree(ua_tmp);
5865         return error;
5866 }
5867
5868 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5869 {
5870         struct dlm_lkb *lkb;
5871         struct dlm_args args;
5872         struct dlm_user_args *ua;
5873         struct dlm_rsb *r;
5874         int error;
5875
5876         dlm_lock_recovery(ls);
5877
5878         error = find_lkb(ls, lkid, &lkb);
5879         if (error)
5880                 goto out;
5881
5882         ua = lkb->lkb_ua;
5883
5884         error = set_unlock_args(flags, ua, &args);
5885         if (error)
5886                 goto out_put;
5887
5888         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5889
5890         r = lkb->lkb_resource;
5891         hold_rsb(r);
5892         lock_rsb(r);
5893
5894         error = validate_unlock_args(lkb, &args);
5895         if (error)
5896                 goto out_r;
5897         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5898
5899         error = _cancel_lock(r, lkb);
5900  out_r:
5901         unlock_rsb(r);
5902         put_rsb(r);
5903
5904         if (error == -DLM_ECANCEL)
5905                 error = 0;
5906         /* from validate_unlock_args() */
5907         if (error == -EBUSY)
5908                 error = 0;
5909  out_put:
5910         dlm_put_lkb(lkb);
5911  out:
5912         dlm_unlock_recovery(ls);
5913         return error;
5914 }
5915
5916 /* lkb's that are removed from the waiters list by revert are just left on the
5917    orphans list with the granted orphan locks, to be freed by purge */
5918
5919 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5920 {
5921         struct dlm_args args;
5922         int error;
5923
5924         hold_lkb(lkb);
5925         mutex_lock(&ls->ls_orphans_mutex);
5926         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5927         mutex_unlock(&ls->ls_orphans_mutex);
5928
5929         set_unlock_args(0, lkb->lkb_ua, &args);
5930
5931         error = cancel_lock(ls, lkb, &args);
5932         if (error == -DLM_ECANCEL)
5933                 error = 0;
5934         return error;
5935 }
5936
5937 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5938    Regardless of what rsb queue the lock is on, it's removed and freed. */
5939
5940 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5941 {
5942         struct dlm_args args;
5943         int error;
5944
5945         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5946
5947         error = unlock_lock(ls, lkb, &args);
5948         if (error == -DLM_EUNLOCK)
5949                 error = 0;
5950         return error;
5951 }
5952
5953 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5954    (which does lock_rsb) due to deadlock with receiving a message that does
5955    lock_rsb followed by dlm_user_add_cb() */
5956
5957 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5958                                      struct dlm_user_proc *proc)
5959 {
5960         struct dlm_lkb *lkb = NULL;
5961
5962         mutex_lock(&ls->ls_clear_proc_locks);
5963         if (list_empty(&proc->locks))
5964                 goto out;
5965
5966         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5967         list_del_init(&lkb->lkb_ownqueue);
5968
5969         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5970                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5971         else
5972                 lkb->lkb_flags |= DLM_IFL_DEAD;
5973  out:
5974         mutex_unlock(&ls->ls_clear_proc_locks);
5975         return lkb;
5976 }
5977
5978 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5979    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5980    which we clear here. */
5981
5982 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5983    list, and no more device_writes should add lkb's to proc->locks list; so we
5984    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5985    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5986    them ourself. */
5987
5988 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5989 {
5990         struct dlm_lkb *lkb, *safe;
5991
5992         dlm_lock_recovery(ls);
5993
5994         while (1) {
5995                 lkb = del_proc_lock(ls, proc);
5996                 if (!lkb)
5997                         break;
5998                 del_timeout(lkb);
5999                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6000                         orphan_proc_lock(ls, lkb);
6001                 else
6002                         unlock_proc_lock(ls, lkb);
6003
6004                 /* this removes the reference for the proc->locks list
6005                    added by dlm_user_request, it may result in the lkb
6006                    being freed */
6007
6008                 dlm_put_lkb(lkb);
6009         }
6010
6011         mutex_lock(&ls->ls_clear_proc_locks);
6012
6013         /* in-progress unlocks */
6014         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6015                 list_del_init(&lkb->lkb_ownqueue);
6016                 lkb->lkb_flags |= DLM_IFL_DEAD;
6017                 dlm_put_lkb(lkb);
6018         }
6019
6020         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6021                 memset(&lkb->lkb_callbacks, 0,
6022                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6023                 list_del_init(&lkb->lkb_cb_list);
6024                 dlm_put_lkb(lkb);
6025         }
6026
6027         mutex_unlock(&ls->ls_clear_proc_locks);
6028         dlm_unlock_recovery(ls);
6029 }
6030
6031 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6032 {
6033         struct dlm_lkb *lkb, *safe;
6034
6035         while (1) {
6036                 lkb = NULL;
6037                 spin_lock(&proc->locks_spin);
6038                 if (!list_empty(&proc->locks)) {
6039                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6040                                          lkb_ownqueue);
6041                         list_del_init(&lkb->lkb_ownqueue);
6042                 }
6043                 spin_unlock(&proc->locks_spin);
6044
6045                 if (!lkb)
6046                         break;
6047
6048                 lkb->lkb_flags |= DLM_IFL_DEAD;
6049                 unlock_proc_lock(ls, lkb);
6050                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6051         }
6052
6053         spin_lock(&proc->locks_spin);
6054         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6055                 list_del_init(&lkb->lkb_ownqueue);
6056                 lkb->lkb_flags |= DLM_IFL_DEAD;
6057                 dlm_put_lkb(lkb);
6058         }
6059         spin_unlock(&proc->locks_spin);
6060
6061         spin_lock(&proc->asts_spin);
6062         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6063                 memset(&lkb->lkb_callbacks, 0,
6064                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6065                 list_del_init(&lkb->lkb_cb_list);
6066                 dlm_put_lkb(lkb);
6067         }
6068         spin_unlock(&proc->asts_spin);
6069 }
6070
6071 /* pid of 0 means purge all orphans */
6072
6073 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6074 {
6075         struct dlm_lkb *lkb, *safe;
6076
6077         mutex_lock(&ls->ls_orphans_mutex);
6078         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6079                 if (pid && lkb->lkb_ownpid != pid)
6080                         continue;
6081                 unlock_proc_lock(ls, lkb);
6082                 list_del_init(&lkb->lkb_ownqueue);
6083                 dlm_put_lkb(lkb);
6084         }
6085         mutex_unlock(&ls->ls_orphans_mutex);
6086 }
6087
6088 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6089 {
6090         struct dlm_message *ms;
6091         struct dlm_mhandle *mh;
6092         int error;
6093
6094         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6095                                 DLM_MSG_PURGE, &ms, &mh);
6096         if (error)
6097                 return error;
6098         ms->m_nodeid = nodeid;
6099         ms->m_pid = pid;
6100
6101         return send_message(mh, ms);
6102 }
6103
6104 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6105                    int nodeid, int pid)
6106 {
6107         int error = 0;
6108
6109         if (nodeid != dlm_our_nodeid()) {
6110                 error = send_purge(ls, nodeid, pid);
6111         } else {
6112                 dlm_lock_recovery(ls);
6113                 if (pid == current->pid)
6114                         purge_proc_locks(ls, proc);
6115                 else
6116                         do_purge(ls, nodeid, pid);
6117                 dlm_unlock_recovery(ls);
6118         }
6119         return error;
6120 }
6121