dlm: fix conversion deadlock from recovery
[firefly-linux-kernel-4.4.55.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 error = -ENOTBLK;
691                 goto out_unlock;
692         }
693
694         if (from_other) {
695                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696                           from_nodeid, dir_nodeid, r->res_name);
697         }
698
699         if (dir_nodeid == our_nodeid) {
700                 /* When we are the dir nodeid, we can set the master
701                    node immediately */
702                 r->res_master_nodeid = our_nodeid;
703                 r->res_nodeid = 0;
704         } else {
705                 /* set_master will send_lookup to dir_nodeid */
706                 r->res_master_nodeid = 0;
707                 r->res_nodeid = -1;
708         }
709
710  out_add:
711         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712  out_unlock:
713         spin_unlock(&ls->ls_rsbtbl[b].lock);
714  out:
715         *r_ret = r;
716         return error;
717 }
718
719 /* During recovery, other nodes can send us new MSTCPY locks (from
720    dlm_recover_locks) before we've made ourself master (in
721    dlm_recover_masters). */
722
723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724                           uint32_t hash, uint32_t b,
725                           int dir_nodeid, int from_nodeid,
726                           unsigned int flags, struct dlm_rsb **r_ret)
727 {
728         struct dlm_rsb *r = NULL;
729         int our_nodeid = dlm_our_nodeid();
730         int recover = (flags & R_RECEIVE_RECOVER);
731         int error;
732
733  retry:
734         error = pre_rsb_struct(ls);
735         if (error < 0)
736                 goto out;
737
738         spin_lock(&ls->ls_rsbtbl[b].lock);
739
740         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741         if (error)
742                 goto do_toss;
743
744         /*
745          * rsb is active, so we can't check master_nodeid without lock_rsb.
746          */
747
748         kref_get(&r->res_ref);
749         goto out_unlock;
750
751
752  do_toss:
753         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754         if (error)
755                 goto do_new;
756
757         /*
758          * rsb found inactive. No other thread is using this rsb because
759          * it's on the toss list, so we can look at or update
760          * res_master_nodeid without lock_rsb.
761          */
762
763         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764                 /* our rsb is not master, and another node has sent us a
765                    request; this should never happen */
766                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767                           from_nodeid, r->res_master_nodeid, dir_nodeid);
768                 dlm_print_rsb(r);
769                 error = -ENOTBLK;
770                 goto out_unlock;
771         }
772
773         if (!recover && (r->res_master_nodeid != our_nodeid) &&
774             (dir_nodeid == our_nodeid)) {
775                 /* our rsb is not master, and we are dir; may as well fix it;
776                    this should never happen */
777                 log_error(ls, "find_rsb toss our %d master %d dir %d",
778                           our_nodeid, r->res_master_nodeid, dir_nodeid);
779                 dlm_print_rsb(r);
780                 r->res_master_nodeid = our_nodeid;
781                 r->res_nodeid = 0;
782         }
783
784         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786         goto out_unlock;
787
788
789  do_new:
790         /*
791          * rsb not found
792          */
793
794         error = get_rsb_struct(ls, name, len, &r);
795         if (error == -EAGAIN) {
796                 spin_unlock(&ls->ls_rsbtbl[b].lock);
797                 goto retry;
798         }
799         if (error)
800                 goto out_unlock;
801
802         r->res_hash = hash;
803         r->res_bucket = b;
804         r->res_dir_nodeid = dir_nodeid;
805         r->res_master_nodeid = dir_nodeid;
806         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807         kref_init(&r->res_ref);
808
809         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810  out_unlock:
811         spin_unlock(&ls->ls_rsbtbl[b].lock);
812  out:
813         *r_ret = r;
814         return error;
815 }
816
817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818                     unsigned int flags, struct dlm_rsb **r_ret)
819 {
820         uint32_t hash, b;
821         int dir_nodeid;
822
823         if (len > DLM_RESNAME_MAXLEN)
824                 return -EINVAL;
825
826         hash = jhash(name, len, 0);
827         b = hash & (ls->ls_rsbtbl_size - 1);
828
829         dir_nodeid = dlm_hash2nodeid(ls, hash);
830
831         if (dlm_no_directory(ls))
832                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833                                       from_nodeid, flags, r_ret);
834         else
835                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836                                       from_nodeid, flags, r_ret);
837 }
838
839 /* we have received a request and found that res_master_nodeid != our_nodeid,
840    so we need to return an error or make ourself the master */
841
842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843                                   int from_nodeid)
844 {
845         if (dlm_no_directory(ls)) {
846                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847                           from_nodeid, r->res_master_nodeid,
848                           r->res_dir_nodeid);
849                 dlm_print_rsb(r);
850                 return -ENOTBLK;
851         }
852
853         if (from_nodeid != r->res_dir_nodeid) {
854                 /* our rsb is not master, and another node (not the dir node)
855                    has sent us a request.  this is much more common when our
856                    master_nodeid is zero, so limit debug to non-zero.  */
857
858                 if (r->res_master_nodeid) {
859                         log_debug(ls, "validate master from_other %d master %d "
860                                   "dir %d first %x %s", from_nodeid,
861                                   r->res_master_nodeid, r->res_dir_nodeid,
862                                   r->res_first_lkid, r->res_name);
863                 }
864                 return -ENOTBLK;
865         } else {
866                 /* our rsb is not master, but the dir nodeid has sent us a
867                    request; this could happen with master 0 / res_nodeid -1 */
868
869                 if (r->res_master_nodeid) {
870                         log_error(ls, "validate master from_dir %d master %d "
871                                   "first %x %s",
872                                   from_nodeid, r->res_master_nodeid,
873                                   r->res_first_lkid, r->res_name);
874                 }
875
876                 r->res_master_nodeid = dlm_our_nodeid();
877                 r->res_nodeid = 0;
878                 return 0;
879         }
880 }
881
882 /*
883  * We're the dir node for this res and another node wants to know the
884  * master nodeid.  During normal operation (non recovery) this is only
885  * called from receive_lookup(); master lookups when the local node is
886  * the dir node are done by find_rsb().
887  *
888  * normal operation, we are the dir node for a resource
889  * . _request_lock
890  * . set_master
891  * . send_lookup
892  * . receive_lookup
893  * . dlm_master_lookup flags 0
894  *
895  * recover directory, we are rebuilding dir for all resources
896  * . dlm_recover_directory
897  * . dlm_rcom_names
898  *   remote node sends back the rsb names it is master of and we are dir of
899  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900  *   we either create new rsb setting remote node as master, or find existing
901  *   rsb and set master to be the remote node.
902  *
903  * recover masters, we are finding the new master for resources
904  * . dlm_recover_masters
905  * . recover_master
906  * . dlm_send_rcom_lookup
907  * . receive_rcom_lookup
908  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
909  */
910
911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
912                       unsigned int flags, int *r_nodeid, int *result)
913 {
914         struct dlm_rsb *r = NULL;
915         uint32_t hash, b;
916         int from_master = (flags & DLM_LU_RECOVER_DIR);
917         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918         int our_nodeid = dlm_our_nodeid();
919         int dir_nodeid, error, toss_list = 0;
920
921         if (len > DLM_RESNAME_MAXLEN)
922                 return -EINVAL;
923
924         if (from_nodeid == our_nodeid) {
925                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926                           our_nodeid, flags);
927                 return -EINVAL;
928         }
929
930         hash = jhash(name, len, 0);
931         b = hash & (ls->ls_rsbtbl_size - 1);
932
933         dir_nodeid = dlm_hash2nodeid(ls, hash);
934         if (dir_nodeid != our_nodeid) {
935                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936                           from_nodeid, dir_nodeid, our_nodeid, hash,
937                           ls->ls_num_nodes);
938                 *r_nodeid = -1;
939                 return -EINVAL;
940         }
941
942  retry:
943         error = pre_rsb_struct(ls);
944         if (error < 0)
945                 return error;
946
947         spin_lock(&ls->ls_rsbtbl[b].lock);
948         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949         if (!error) {
950                 /* because the rsb is active, we need to lock_rsb before
951                    checking/changing re_master_nodeid */
952
953                 hold_rsb(r);
954                 spin_unlock(&ls->ls_rsbtbl[b].lock);
955                 lock_rsb(r);
956                 goto found;
957         }
958
959         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960         if (error)
961                 goto not_found;
962
963         /* because the rsb is inactive (on toss list), it's not refcounted
964            and lock_rsb is not used, but is protected by the rsbtbl lock */
965
966         toss_list = 1;
967  found:
968         if (r->res_dir_nodeid != our_nodeid) {
969                 /* should not happen, but may as well fix it and carry on */
970                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971                           r->res_dir_nodeid, our_nodeid, r->res_name);
972                 r->res_dir_nodeid = our_nodeid;
973         }
974
975         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976                 /* Recovery uses this function to set a new master when
977                    the previous master failed.  Setting NEW_MASTER will
978                    force dlm_recover_masters to call recover_master on this
979                    rsb even though the res_nodeid is no longer removed. */
980
981                 r->res_master_nodeid = from_nodeid;
982                 r->res_nodeid = from_nodeid;
983                 rsb_set_flag(r, RSB_NEW_MASTER);
984
985                 if (toss_list) {
986                         /* I don't think we should ever find it on toss list. */
987                         log_error(ls, "dlm_master_lookup fix_master on toss");
988                         dlm_dump_rsb(r);
989                 }
990         }
991
992         if (from_master && (r->res_master_nodeid != from_nodeid)) {
993                 /* this will happen if from_nodeid became master during
994                    a previous recovery cycle, and we aborted the previous
995                    cycle before recovering this master value */
996
997                 log_limit(ls, "dlm_master_lookup from_master %d "
998                           "master_nodeid %d res_nodeid %d first %x %s",
999                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000                           r->res_first_lkid, r->res_name);
1001
1002                 if (r->res_master_nodeid == our_nodeid) {
1003                         log_error(ls, "from_master %d our_master", from_nodeid);
1004                         dlm_dump_rsb(r);
1005                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1006                         goto out_found;
1007                 }
1008
1009                 r->res_master_nodeid = from_nodeid;
1010                 r->res_nodeid = from_nodeid;
1011                 rsb_set_flag(r, RSB_NEW_MASTER);
1012         }
1013
1014         if (!r->res_master_nodeid) {
1015                 /* this will happen if recovery happens while we're looking
1016                    up the master for this rsb */
1017
1018                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019                           from_nodeid, r->res_first_lkid, r->res_name);
1020                 r->res_master_nodeid = from_nodeid;
1021                 r->res_nodeid = from_nodeid;
1022         }
1023
1024         if (!from_master && !fix_master &&
1025             (r->res_master_nodeid == from_nodeid)) {
1026                 /* this can happen when the master sends remove, the dir node
1027                    finds the rsb on the keep list and ignores the remove,
1028                    and the former master sends a lookup */
1029
1030                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031                           "first %x %s", from_nodeid, flags,
1032                           r->res_first_lkid, r->res_name);
1033         }
1034
1035  out_found:
1036         *r_nodeid = r->res_master_nodeid;
1037         if (result)
1038                 *result = DLM_LU_MATCH;
1039
1040         if (toss_list) {
1041                 r->res_toss_time = jiffies;
1042                 /* the rsb was inactive (on toss list) */
1043                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1044         } else {
1045                 /* the rsb was active */
1046                 unlock_rsb(r);
1047                 put_rsb(r);
1048         }
1049         return 0;
1050
1051  not_found:
1052         error = get_rsb_struct(ls, name, len, &r);
1053         if (error == -EAGAIN) {
1054                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1055                 goto retry;
1056         }
1057         if (error)
1058                 goto out_unlock;
1059
1060         r->res_hash = hash;
1061         r->res_bucket = b;
1062         r->res_dir_nodeid = our_nodeid;
1063         r->res_master_nodeid = from_nodeid;
1064         r->res_nodeid = from_nodeid;
1065         kref_init(&r->res_ref);
1066         r->res_toss_time = jiffies;
1067
1068         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069         if (error) {
1070                 /* should never happen */
1071                 dlm_free_rsb(r);
1072                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1073                 goto retry;
1074         }
1075
1076         if (result)
1077                 *result = DLM_LU_ADD;
1078         *r_nodeid = from_nodeid;
1079         error = 0;
1080  out_unlock:
1081         spin_unlock(&ls->ls_rsbtbl[b].lock);
1082         return error;
1083 }
1084
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086 {
1087         struct rb_node *n;
1088         struct dlm_rsb *r;
1089         int i;
1090
1091         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092                 spin_lock(&ls->ls_rsbtbl[i].lock);
1093                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095                         if (r->res_hash == hash)
1096                                 dlm_dump_rsb(r);
1097                 }
1098                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1099         }
1100 }
1101
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103 {
1104         struct dlm_rsb *r = NULL;
1105         uint32_t hash, b;
1106         int error;
1107
1108         hash = jhash(name, len, 0);
1109         b = hash & (ls->ls_rsbtbl_size - 1);
1110
1111         spin_lock(&ls->ls_rsbtbl[b].lock);
1112         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113         if (!error)
1114                 goto out_dump;
1115
1116         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117         if (error)
1118                 goto out;
1119  out_dump:
1120         dlm_dump_rsb(r);
1121  out:
1122         spin_unlock(&ls->ls_rsbtbl[b].lock);
1123 }
1124
1125 static void toss_rsb(struct kref *kref)
1126 {
1127         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128         struct dlm_ls *ls = r->res_ls;
1129
1130         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131         kref_init(&r->res_ref);
1132         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134         r->res_toss_time = jiffies;
1135         if (r->res_lvbptr) {
1136                 dlm_free_lvb(r->res_lvbptr);
1137                 r->res_lvbptr = NULL;
1138         }
1139 }
1140
1141 /* See comment for unhold_lkb */
1142
1143 static void unhold_rsb(struct dlm_rsb *r)
1144 {
1145         int rv;
1146         rv = kref_put(&r->res_ref, toss_rsb);
1147         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1148 }
1149
1150 static void kill_rsb(struct kref *kref)
1151 {
1152         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1153
1154         /* All work is done after the return from kref_put() so we
1155            can release the write_lock before the remove and free. */
1156
1157         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1163 }
1164
1165 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166    The rsb must exist as long as any lkb's for it do. */
1167
1168 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1169 {
1170         hold_rsb(r);
1171         lkb->lkb_resource = r;
1172 }
1173
1174 static void detach_lkb(struct dlm_lkb *lkb)
1175 {
1176         if (lkb->lkb_resource) {
1177                 put_rsb(lkb->lkb_resource);
1178                 lkb->lkb_resource = NULL;
1179         }
1180 }
1181
1182 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1183 {
1184         struct dlm_lkb *lkb;
1185         int rv, id;
1186
1187         lkb = dlm_allocate_lkb(ls);
1188         if (!lkb)
1189                 return -ENOMEM;
1190
1191         lkb->lkb_nodeid = -1;
1192         lkb->lkb_grmode = DLM_LOCK_IV;
1193         kref_init(&lkb->lkb_ref);
1194         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196         INIT_LIST_HEAD(&lkb->lkb_time_list);
1197         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198         mutex_init(&lkb->lkb_cb_mutex);
1199         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1200
1201  retry:
1202         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1203         if (!rv)
1204                 return -ENOMEM;
1205
1206         spin_lock(&ls->ls_lkbidr_spin);
1207         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1208         if (!rv)
1209                 lkb->lkb_id = id;
1210         spin_unlock(&ls->ls_lkbidr_spin);
1211
1212         if (rv == -EAGAIN)
1213                 goto retry;
1214
1215         if (rv < 0) {
1216                 log_error(ls, "create_lkb idr error %d", rv);
1217                 return rv;
1218         }
1219
1220         *lkb_ret = lkb;
1221         return 0;
1222 }
1223
1224 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1225 {
1226         struct dlm_lkb *lkb;
1227
1228         spin_lock(&ls->ls_lkbidr_spin);
1229         lkb = idr_find(&ls->ls_lkbidr, lkid);
1230         if (lkb)
1231                 kref_get(&lkb->lkb_ref);
1232         spin_unlock(&ls->ls_lkbidr_spin);
1233
1234         *lkb_ret = lkb;
1235         return lkb ? 0 : -ENOENT;
1236 }
1237
1238 static void kill_lkb(struct kref *kref)
1239 {
1240         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1241
1242         /* All work is done after the return from kref_put() so we
1243            can release the write_lock before the detach_lkb */
1244
1245         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1246 }
1247
1248 /* __put_lkb() is used when an lkb may not have an rsb attached to
1249    it so we need to provide the lockspace explicitly */
1250
1251 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1252 {
1253         uint32_t lkid = lkb->lkb_id;
1254
1255         spin_lock(&ls->ls_lkbidr_spin);
1256         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257                 idr_remove(&ls->ls_lkbidr, lkid);
1258                 spin_unlock(&ls->ls_lkbidr_spin);
1259
1260                 detach_lkb(lkb);
1261
1262                 /* for local/process lkbs, lvbptr points to caller's lksb */
1263                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264                         dlm_free_lvb(lkb->lkb_lvbptr);
1265                 dlm_free_lkb(lkb);
1266                 return 1;
1267         } else {
1268                 spin_unlock(&ls->ls_lkbidr_spin);
1269                 return 0;
1270         }
1271 }
1272
1273 int dlm_put_lkb(struct dlm_lkb *lkb)
1274 {
1275         struct dlm_ls *ls;
1276
1277         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1279
1280         ls = lkb->lkb_resource->res_ls;
1281         return __put_lkb(ls, lkb);
1282 }
1283
1284 /* This is only called to add a reference when the code already holds
1285    a valid reference to the lkb, so there's no need for locking. */
1286
1287 static inline void hold_lkb(struct dlm_lkb *lkb)
1288 {
1289         kref_get(&lkb->lkb_ref);
1290 }
1291
1292 /* This is called when we need to remove a reference and are certain
1293    it's not the last ref.  e.g. del_lkb is always called between a
1294    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295    put_lkb would work fine, but would involve unnecessary locking */
1296
1297 static inline void unhold_lkb(struct dlm_lkb *lkb)
1298 {
1299         int rv;
1300         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1302 }
1303
1304 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1305                             int mode)
1306 {
1307         struct dlm_lkb *lkb = NULL;
1308
1309         list_for_each_entry(lkb, head, lkb_statequeue)
1310                 if (lkb->lkb_rqmode < mode)
1311                         break;
1312
1313         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1314 }
1315
1316 /* add/remove lkb to rsb's grant/convert/wait queue */
1317
1318 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1319 {
1320         kref_get(&lkb->lkb_ref);
1321
1322         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1323
1324         lkb->lkb_timestamp = ktime_get();
1325
1326         lkb->lkb_status = status;
1327
1328         switch (status) {
1329         case DLM_LKSTS_WAITING:
1330                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1332                 else
1333                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1334                 break;
1335         case DLM_LKSTS_GRANTED:
1336                 /* convention says granted locks kept in order of grmode */
1337                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1338                                 lkb->lkb_grmode);
1339                 break;
1340         case DLM_LKSTS_CONVERT:
1341                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1343                 else
1344                         list_add_tail(&lkb->lkb_statequeue,
1345                                       &r->res_convertqueue);
1346                 break;
1347         default:
1348                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1349         }
1350 }
1351
1352 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353 {
1354         lkb->lkb_status = 0;
1355         list_del(&lkb->lkb_statequeue);
1356         unhold_lkb(lkb);
1357 }
1358
1359 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1360 {
1361         hold_lkb(lkb);
1362         del_lkb(r, lkb);
1363         add_lkb(r, lkb, sts);
1364         unhold_lkb(lkb);
1365 }
1366
1367 static int msg_reply_type(int mstype)
1368 {
1369         switch (mstype) {
1370         case DLM_MSG_REQUEST:
1371                 return DLM_MSG_REQUEST_REPLY;
1372         case DLM_MSG_CONVERT:
1373                 return DLM_MSG_CONVERT_REPLY;
1374         case DLM_MSG_UNLOCK:
1375                 return DLM_MSG_UNLOCK_REPLY;
1376         case DLM_MSG_CANCEL:
1377                 return DLM_MSG_CANCEL_REPLY;
1378         case DLM_MSG_LOOKUP:
1379                 return DLM_MSG_LOOKUP_REPLY;
1380         }
1381         return -1;
1382 }
1383
1384 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1385 {
1386         int i;
1387
1388         for (i = 0; i < num_nodes; i++) {
1389                 if (!warned[i]) {
1390                         warned[i] = nodeid;
1391                         return 0;
1392                 }
1393                 if (warned[i] == nodeid)
1394                         return 1;
1395         }
1396         return 0;
1397 }
1398
1399 void dlm_scan_waiters(struct dlm_ls *ls)
1400 {
1401         struct dlm_lkb *lkb;
1402         ktime_t zero = ktime_set(0, 0);
1403         s64 us;
1404         s64 debug_maxus = 0;
1405         u32 debug_scanned = 0;
1406         u32 debug_expired = 0;
1407         int num_nodes = 0;
1408         int *warned = NULL;
1409
1410         if (!dlm_config.ci_waitwarn_us)
1411                 return;
1412
1413         mutex_lock(&ls->ls_waiters_mutex);
1414
1415         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1416                 if (ktime_equal(lkb->lkb_wait_time, zero))
1417                         continue;
1418
1419                 debug_scanned++;
1420
1421                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1422
1423                 if (us < dlm_config.ci_waitwarn_us)
1424                         continue;
1425
1426                 lkb->lkb_wait_time = zero;
1427
1428                 debug_expired++;
1429                 if (us > debug_maxus)
1430                         debug_maxus = us;
1431
1432                 if (!num_nodes) {
1433                         num_nodes = ls->ls_num_nodes;
1434                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1435                 }
1436                 if (!warned)
1437                         continue;
1438                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1439                         continue;
1440
1441                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1442                           "node %d", lkb->lkb_id, (long long)us,
1443                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1444         }
1445         mutex_unlock(&ls->ls_waiters_mutex);
1446         kfree(warned);
1447
1448         if (debug_expired)
1449                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450                           debug_scanned, debug_expired,
1451                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1452 }
1453
1454 /* add/remove lkb from global waiters list of lkb's waiting for
1455    a reply from a remote node */
1456
1457 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1458 {
1459         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1460         int error = 0;
1461
1462         mutex_lock(&ls->ls_waiters_mutex);
1463
1464         if (is_overlap_unlock(lkb) ||
1465             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1466                 error = -EINVAL;
1467                 goto out;
1468         }
1469
1470         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1471                 switch (mstype) {
1472                 case DLM_MSG_UNLOCK:
1473                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1474                         break;
1475                 case DLM_MSG_CANCEL:
1476                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1477                         break;
1478                 default:
1479                         error = -EBUSY;
1480                         goto out;
1481                 }
1482                 lkb->lkb_wait_count++;
1483                 hold_lkb(lkb);
1484
1485                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487                           lkb->lkb_wait_count, lkb->lkb_flags);
1488                 goto out;
1489         }
1490
1491         DLM_ASSERT(!lkb->lkb_wait_count,
1492                    dlm_print_lkb(lkb);
1493                    printk("wait_count %d\n", lkb->lkb_wait_count););
1494
1495         lkb->lkb_wait_count++;
1496         lkb->lkb_wait_type = mstype;
1497         lkb->lkb_wait_time = ktime_get();
1498         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1499         hold_lkb(lkb);
1500         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1501  out:
1502         if (error)
1503                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1506         mutex_unlock(&ls->ls_waiters_mutex);
1507         return error;
1508 }
1509
1510 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1511    list as part of process_requestqueue (e.g. a lookup that has an optimized
1512    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513    set RESEND and dlm_recover_waiters_post() */
1514
1515 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516                                 struct dlm_message *ms)
1517 {
1518         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519         int overlap_done = 0;
1520
1521         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1523                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1524                 overlap_done = 1;
1525                 goto out_del;
1526         }
1527
1528         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1530                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1531                 overlap_done = 1;
1532                 goto out_del;
1533         }
1534
1535         /* Cancel state was preemptively cleared by a successful convert,
1536            see next comment, nothing to do. */
1537
1538         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541                           lkb->lkb_id, lkb->lkb_wait_type);
1542                 return -1;
1543         }
1544
1545         /* Remove for the convert reply, and premptively remove for the
1546            cancel reply.  A convert has been granted while there's still
1547            an outstanding cancel on it (the cancel is moot and the result
1548            in the cancel reply should be 0).  We preempt the cancel reply
1549            because the app gets the convert result and then can follow up
1550            with another op, like convert.  This subsequent op would see the
1551            lingering state of the cancel and fail with -EBUSY. */
1552
1553         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1557                           lkb->lkb_id);
1558                 lkb->lkb_wait_type = 0;
1559                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1560                 lkb->lkb_wait_count--;
1561                 goto out_del;
1562         }
1563
1564         /* N.B. type of reply may not always correspond to type of original
1565            msg due to lookup->request optimization, verify others? */
1566
1567         if (lkb->lkb_wait_type) {
1568                 lkb->lkb_wait_type = 0;
1569                 goto out_del;
1570         }
1571
1572         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574                   mstype, lkb->lkb_flags);
1575         return -1;
1576
1577  out_del:
1578         /* the force-unlock/cancel has completed and we haven't recvd a reply
1579            to the op that was in progress prior to the unlock/cancel; we
1580            give up on any reply to the earlier op.  FIXME: not sure when/how
1581            this would happen */
1582
1583         if (overlap_done && lkb->lkb_wait_type) {
1584                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586                 lkb->lkb_wait_count--;
1587                 lkb->lkb_wait_type = 0;
1588         }
1589
1590         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1591
1592         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593         lkb->lkb_wait_count--;
1594         if (!lkb->lkb_wait_count)
1595                 list_del_init(&lkb->lkb_wait_reply);
1596         unhold_lkb(lkb);
1597         return 0;
1598 }
1599
1600 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1601 {
1602         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1603         int error;
1604
1605         mutex_lock(&ls->ls_waiters_mutex);
1606         error = _remove_from_waiters(lkb, mstype, NULL);
1607         mutex_unlock(&ls->ls_waiters_mutex);
1608         return error;
1609 }
1610
1611 /* Handles situations where we might be processing a "fake" or "stub" reply in
1612    which we can't try to take waiters_mutex again. */
1613
1614 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1615 {
1616         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1617         int error;
1618
1619         if (ms->m_flags != DLM_IFL_STUB_MS)
1620                 mutex_lock(&ls->ls_waiters_mutex);
1621         error = _remove_from_waiters(lkb, ms->m_type, ms);
1622         if (ms->m_flags != DLM_IFL_STUB_MS)
1623                 mutex_unlock(&ls->ls_waiters_mutex);
1624         return error;
1625 }
1626
1627 /* If there's an rsb for the same resource being removed, ensure
1628    that the remove message is sent before the new lookup message.
1629    It should be rare to need a delay here, but if not, then it may
1630    be worthwhile to add a proper wait mechanism rather than a delay. */
1631
1632 static void wait_pending_remove(struct dlm_rsb *r)
1633 {
1634         struct dlm_ls *ls = r->res_ls;
1635  restart:
1636         spin_lock(&ls->ls_remove_spin);
1637         if (ls->ls_remove_len &&
1638             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639                 log_debug(ls, "delay lookup for remove dir %d %s",
1640                           r->res_dir_nodeid, r->res_name);
1641                 spin_unlock(&ls->ls_remove_spin);
1642                 msleep(1);
1643                 goto restart;
1644         }
1645         spin_unlock(&ls->ls_remove_spin);
1646 }
1647
1648 /*
1649  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650  * read by other threads in wait_pending_remove.  ls_remove_names
1651  * and ls_remove_lens are only used by the scan thread, so they do
1652  * not need protection.
1653  */
1654
1655 static void shrink_bucket(struct dlm_ls *ls, int b)
1656 {
1657         struct rb_node *n, *next;
1658         struct dlm_rsb *r;
1659         char *name;
1660         int our_nodeid = dlm_our_nodeid();
1661         int remote_count = 0;
1662         int i, len, rv;
1663
1664         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1665
1666         spin_lock(&ls->ls_rsbtbl[b].lock);
1667         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1668                 next = rb_next(n);
1669                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1670
1671                 /* If we're the directory record for this rsb, and
1672                    we're not the master of it, then we need to wait
1673                    for the master node to send us a dir remove for
1674                    before removing the dir record. */
1675
1676                 if (!dlm_no_directory(ls) &&
1677                     (r->res_master_nodeid != our_nodeid) &&
1678                     (dlm_dir_nodeid(r) == our_nodeid)) {
1679                         continue;
1680                 }
1681
1682                 if (!time_after_eq(jiffies, r->res_toss_time +
1683                                    dlm_config.ci_toss_secs * HZ)) {
1684                         continue;
1685                 }
1686
1687                 if (!dlm_no_directory(ls) &&
1688                     (r->res_master_nodeid == our_nodeid) &&
1689                     (dlm_dir_nodeid(r) != our_nodeid)) {
1690
1691                         /* We're the master of this rsb but we're not
1692                            the directory record, so we need to tell the
1693                            dir node to remove the dir record. */
1694
1695                         ls->ls_remove_lens[remote_count] = r->res_length;
1696                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1697                                DLM_RESNAME_MAXLEN);
1698                         remote_count++;
1699
1700                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1701                                 break;
1702                         continue;
1703                 }
1704
1705                 if (!kref_put(&r->res_ref, kill_rsb)) {
1706                         log_error(ls, "tossed rsb in use %s", r->res_name);
1707                         continue;
1708                 }
1709
1710                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711                 dlm_free_rsb(r);
1712         }
1713         spin_unlock(&ls->ls_rsbtbl[b].lock);
1714
1715         /*
1716          * While searching for rsb's to free, we found some that require
1717          * remote removal.  We leave them in place and find them again here
1718          * so there is a very small gap between removing them from the toss
1719          * list and sending the removal.  Keeping this gap small is
1720          * important to keep us (the master node) from being out of sync
1721          * with the remote dir node for very long.
1722          *
1723          * From the time the rsb is removed from toss until just after
1724          * send_remove, the rsb name is saved in ls_remove_name.  A new
1725          * lookup checks this to ensure that a new lookup message for the
1726          * same resource name is not sent just before the remove message.
1727          */
1728
1729         for (i = 0; i < remote_count; i++) {
1730                 name = ls->ls_remove_names[i];
1731                 len = ls->ls_remove_lens[i];
1732
1733                 spin_lock(&ls->ls_rsbtbl[b].lock);
1734                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1735                 if (rv) {
1736                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1737                         log_debug(ls, "remove_name not toss %s", name);
1738                         continue;
1739                 }
1740
1741                 if (r->res_master_nodeid != our_nodeid) {
1742                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1743                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1744                                   r->res_master_nodeid, r->res_dir_nodeid,
1745                                   our_nodeid, name);
1746                         continue;
1747                 }
1748
1749                 if (r->res_dir_nodeid == our_nodeid) {
1750                         /* should never happen */
1751                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1752                         log_error(ls, "remove_name dir %d master %d our %d %s",
1753                                   r->res_dir_nodeid, r->res_master_nodeid,
1754                                   our_nodeid, name);
1755                         continue;
1756                 }
1757
1758                 if (!time_after_eq(jiffies, r->res_toss_time +
1759                                    dlm_config.ci_toss_secs * HZ)) {
1760                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1761                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762                                   r->res_toss_time, jiffies, name);
1763                         continue;
1764                 }
1765
1766                 if (!kref_put(&r->res_ref, kill_rsb)) {
1767                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1768                         log_error(ls, "remove_name in use %s", name);
1769                         continue;
1770                 }
1771
1772                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773
1774                 /* block lookup of same name until we've sent remove */
1775                 spin_lock(&ls->ls_remove_spin);
1776                 ls->ls_remove_len = len;
1777                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778                 spin_unlock(&ls->ls_remove_spin);
1779                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1780
1781                 send_remove(r);
1782
1783                 /* allow lookup of name again */
1784                 spin_lock(&ls->ls_remove_spin);
1785                 ls->ls_remove_len = 0;
1786                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787                 spin_unlock(&ls->ls_remove_spin);
1788
1789                 dlm_free_rsb(r);
1790         }
1791 }
1792
1793 void dlm_scan_rsbs(struct dlm_ls *ls)
1794 {
1795         int i;
1796
1797         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1798                 shrink_bucket(ls, i);
1799                 if (dlm_locking_stopped(ls))
1800                         break;
1801                 cond_resched();
1802         }
1803 }
1804
1805 static void add_timeout(struct dlm_lkb *lkb)
1806 {
1807         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1808
1809         if (is_master_copy(lkb))
1810                 return;
1811
1812         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1813             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1814                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1815                 goto add_it;
1816         }
1817         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1818                 goto add_it;
1819         return;
1820
1821  add_it:
1822         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1823         mutex_lock(&ls->ls_timeout_mutex);
1824         hold_lkb(lkb);
1825         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1826         mutex_unlock(&ls->ls_timeout_mutex);
1827 }
1828
1829 static void del_timeout(struct dlm_lkb *lkb)
1830 {
1831         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1832
1833         mutex_lock(&ls->ls_timeout_mutex);
1834         if (!list_empty(&lkb->lkb_time_list)) {
1835                 list_del_init(&lkb->lkb_time_list);
1836                 unhold_lkb(lkb);
1837         }
1838         mutex_unlock(&ls->ls_timeout_mutex);
1839 }
1840
1841 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1842    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1843    and then lock rsb because of lock ordering in add_timeout.  We may need
1844    to specify some special timeout-related bits in the lkb that are just to
1845    be accessed under the timeout_mutex. */
1846
1847 void dlm_scan_timeout(struct dlm_ls *ls)
1848 {
1849         struct dlm_rsb *r;
1850         struct dlm_lkb *lkb;
1851         int do_cancel, do_warn;
1852         s64 wait_us;
1853
1854         for (;;) {
1855                 if (dlm_locking_stopped(ls))
1856                         break;
1857
1858                 do_cancel = 0;
1859                 do_warn = 0;
1860                 mutex_lock(&ls->ls_timeout_mutex);
1861                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1862
1863                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1864                                                         lkb->lkb_timestamp));
1865
1866                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1867                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1868                                 do_cancel = 1;
1869
1870                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1871                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1872                                 do_warn = 1;
1873
1874                         if (!do_cancel && !do_warn)
1875                                 continue;
1876                         hold_lkb(lkb);
1877                         break;
1878                 }
1879                 mutex_unlock(&ls->ls_timeout_mutex);
1880
1881                 if (!do_cancel && !do_warn)
1882                         break;
1883
1884                 r = lkb->lkb_resource;
1885                 hold_rsb(r);
1886                 lock_rsb(r);
1887
1888                 if (do_warn) {
1889                         /* clear flag so we only warn once */
1890                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1891                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1892                                 del_timeout(lkb);
1893                         dlm_timeout_warn(lkb);
1894                 }
1895
1896                 if (do_cancel) {
1897                         log_debug(ls, "timeout cancel %x node %d %s",
1898                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1899                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1900                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1901                         del_timeout(lkb);
1902                         _cancel_lock(r, lkb);
1903                 }
1904
1905                 unlock_rsb(r);
1906                 unhold_rsb(r);
1907                 dlm_put_lkb(lkb);
1908         }
1909 }
1910
1911 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1912    dlm_recoverd before checking/setting ls_recover_begin. */
1913
1914 void dlm_adjust_timeouts(struct dlm_ls *ls)
1915 {
1916         struct dlm_lkb *lkb;
1917         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1918
1919         ls->ls_recover_begin = 0;
1920         mutex_lock(&ls->ls_timeout_mutex);
1921         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1922                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1923         mutex_unlock(&ls->ls_timeout_mutex);
1924
1925         if (!dlm_config.ci_waitwarn_us)
1926                 return;
1927
1928         mutex_lock(&ls->ls_waiters_mutex);
1929         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1930                 if (ktime_to_us(lkb->lkb_wait_time))
1931                         lkb->lkb_wait_time = ktime_get();
1932         }
1933         mutex_unlock(&ls->ls_waiters_mutex);
1934 }
1935
1936 /* lkb is master or local copy */
1937
1938 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1939 {
1940         int b, len = r->res_ls->ls_lvblen;
1941
1942         /* b=1 lvb returned to caller
1943            b=0 lvb written to rsb or invalidated
1944            b=-1 do nothing */
1945
1946         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1947
1948         if (b == 1) {
1949                 if (!lkb->lkb_lvbptr)
1950                         return;
1951
1952                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1953                         return;
1954
1955                 if (!r->res_lvbptr)
1956                         return;
1957
1958                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1959                 lkb->lkb_lvbseq = r->res_lvbseq;
1960
1961         } else if (b == 0) {
1962                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1963                         rsb_set_flag(r, RSB_VALNOTVALID);
1964                         return;
1965                 }
1966
1967                 if (!lkb->lkb_lvbptr)
1968                         return;
1969
1970                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1971                         return;
1972
1973                 if (!r->res_lvbptr)
1974                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1975
1976                 if (!r->res_lvbptr)
1977                         return;
1978
1979                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1980                 r->res_lvbseq++;
1981                 lkb->lkb_lvbseq = r->res_lvbseq;
1982                 rsb_clear_flag(r, RSB_VALNOTVALID);
1983         }
1984
1985         if (rsb_flag(r, RSB_VALNOTVALID))
1986                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1987 }
1988
1989 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1990 {
1991         if (lkb->lkb_grmode < DLM_LOCK_PW)
1992                 return;
1993
1994         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1995                 rsb_set_flag(r, RSB_VALNOTVALID);
1996                 return;
1997         }
1998
1999         if (!lkb->lkb_lvbptr)
2000                 return;
2001
2002         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2003                 return;
2004
2005         if (!r->res_lvbptr)
2006                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2007
2008         if (!r->res_lvbptr)
2009                 return;
2010
2011         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2012         r->res_lvbseq++;
2013         rsb_clear_flag(r, RSB_VALNOTVALID);
2014 }
2015
2016 /* lkb is process copy (pc) */
2017
2018 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2019                             struct dlm_message *ms)
2020 {
2021         int b;
2022
2023         if (!lkb->lkb_lvbptr)
2024                 return;
2025
2026         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2027                 return;
2028
2029         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2030         if (b == 1) {
2031                 int len = receive_extralen(ms);
2032                 if (len > DLM_RESNAME_MAXLEN)
2033                         len = DLM_RESNAME_MAXLEN;
2034                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2035                 lkb->lkb_lvbseq = ms->m_lvbseq;
2036         }
2037 }
2038
2039 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2040    remove_lock -- used for unlock, removes lkb from granted
2041    revert_lock -- used for cancel, moves lkb from convert to granted
2042    grant_lock  -- used for request and convert, adds lkb to granted or
2043                   moves lkb from convert or waiting to granted
2044
2045    Each of these is used for master or local copy lkb's.  There is
2046    also a _pc() variation used to make the corresponding change on
2047    a process copy (pc) lkb. */
2048
2049 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051         del_lkb(r, lkb);
2052         lkb->lkb_grmode = DLM_LOCK_IV;
2053         /* this unhold undoes the original ref from create_lkb()
2054            so this leads to the lkb being freed */
2055         unhold_lkb(lkb);
2056 }
2057
2058 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060         set_lvb_unlock(r, lkb);
2061         _remove_lock(r, lkb);
2062 }
2063
2064 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2065 {
2066         _remove_lock(r, lkb);
2067 }
2068
2069 /* returns: 0 did nothing
2070             1 moved lock to granted
2071            -1 removed lock */
2072
2073 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2074 {
2075         int rv = 0;
2076
2077         lkb->lkb_rqmode = DLM_LOCK_IV;
2078
2079         switch (lkb->lkb_status) {
2080         case DLM_LKSTS_GRANTED:
2081                 break;
2082         case DLM_LKSTS_CONVERT:
2083                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2084                 rv = 1;
2085                 break;
2086         case DLM_LKSTS_WAITING:
2087                 del_lkb(r, lkb);
2088                 lkb->lkb_grmode = DLM_LOCK_IV;
2089                 /* this unhold undoes the original ref from create_lkb()
2090                    so this leads to the lkb being freed */
2091                 unhold_lkb(lkb);
2092                 rv = -1;
2093                 break;
2094         default:
2095                 log_print("invalid status for revert %d", lkb->lkb_status);
2096         }
2097         return rv;
2098 }
2099
2100 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101 {
2102         return revert_lock(r, lkb);
2103 }
2104
2105 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 {
2107         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2108                 lkb->lkb_grmode = lkb->lkb_rqmode;
2109                 if (lkb->lkb_status)
2110                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2111                 else
2112                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2113         }
2114
2115         lkb->lkb_rqmode = DLM_LOCK_IV;
2116         lkb->lkb_highbast = 0;
2117 }
2118
2119 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2120 {
2121         set_lvb_lock(r, lkb);
2122         _grant_lock(r, lkb);
2123 }
2124
2125 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126                           struct dlm_message *ms)
2127 {
2128         set_lvb_lock_pc(r, lkb, ms);
2129         _grant_lock(r, lkb);
2130 }
2131
2132 /* called by grant_pending_locks() which means an async grant message must
2133    be sent to the requesting node in addition to granting the lock if the
2134    lkb belongs to a remote node. */
2135
2136 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2137 {
2138         grant_lock(r, lkb);
2139         if (is_master_copy(lkb))
2140                 send_grant(r, lkb);
2141         else
2142                 queue_cast(r, lkb, 0);
2143 }
2144
2145 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2146    change the granted/requested modes.  We're munging things accordingly in
2147    the process copy.
2148    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2149    conversion deadlock
2150    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2151    compatible with other granted locks */
2152
2153 static void munge_demoted(struct dlm_lkb *lkb)
2154 {
2155         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2156                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2157                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2158                 return;
2159         }
2160
2161         lkb->lkb_grmode = DLM_LOCK_NL;
2162 }
2163
2164 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2165 {
2166         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2167             ms->m_type != DLM_MSG_GRANT) {
2168                 log_print("munge_altmode %x invalid reply type %d",
2169                           lkb->lkb_id, ms->m_type);
2170                 return;
2171         }
2172
2173         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2174                 lkb->lkb_rqmode = DLM_LOCK_PR;
2175         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2176                 lkb->lkb_rqmode = DLM_LOCK_CW;
2177         else {
2178                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2179                 dlm_print_lkb(lkb);
2180         }
2181 }
2182
2183 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2184 {
2185         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2186                                            lkb_statequeue);
2187         if (lkb->lkb_id == first->lkb_id)
2188                 return 1;
2189
2190         return 0;
2191 }
2192
2193 /* Check if the given lkb conflicts with another lkb on the queue. */
2194
2195 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2196 {
2197         struct dlm_lkb *this;
2198
2199         list_for_each_entry(this, head, lkb_statequeue) {
2200                 if (this == lkb)
2201                         continue;
2202                 if (!modes_compat(this, lkb))
2203                         return 1;
2204         }
2205         return 0;
2206 }
2207
2208 /*
2209  * "A conversion deadlock arises with a pair of lock requests in the converting
2210  * queue for one resource.  The granted mode of each lock blocks the requested
2211  * mode of the other lock."
2212  *
2213  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2214  * convert queue from being granted, then deadlk/demote lkb.
2215  *
2216  * Example:
2217  * Granted Queue: empty
2218  * Convert Queue: NL->EX (first lock)
2219  *                PR->EX (second lock)
2220  *
2221  * The first lock can't be granted because of the granted mode of the second
2222  * lock and the second lock can't be granted because it's not first in the
2223  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2224  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2225  * flag set and return DEMOTED in the lksb flags.
2226  *
2227  * Originally, this function detected conv-deadlk in a more limited scope:
2228  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2229  * - if lkb1 was the first entry in the queue (not just earlier), and was
2230  *   blocked by the granted mode of lkb2, and there was nothing on the
2231  *   granted queue preventing lkb1 from being granted immediately, i.e.
2232  *   lkb2 was the only thing preventing lkb1 from being granted.
2233  *
2234  * That second condition meant we'd only say there was conv-deadlk if
2235  * resolving it (by demotion) would lead to the first lock on the convert
2236  * queue being granted right away.  It allowed conversion deadlocks to exist
2237  * between locks on the convert queue while they couldn't be granted anyway.
2238  *
2239  * Now, we detect and take action on conversion deadlocks immediately when
2240  * they're created, even if they may not be immediately consequential.  If
2241  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2242  * mode that would prevent lkb1's conversion from being granted, we do a
2243  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2244  * I think this means that the lkb_is_ahead condition below should always
2245  * be zero, i.e. there will never be conv-deadlk between two locks that are
2246  * both already on the convert queue.
2247  */
2248
2249 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2250 {
2251         struct dlm_lkb *lkb1;
2252         int lkb_is_ahead = 0;
2253
2254         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2255                 if (lkb1 == lkb2) {
2256                         lkb_is_ahead = 1;
2257                         continue;
2258                 }
2259
2260                 if (!lkb_is_ahead) {
2261                         if (!modes_compat(lkb2, lkb1))
2262                                 return 1;
2263                 } else {
2264                         if (!modes_compat(lkb2, lkb1) &&
2265                             !modes_compat(lkb1, lkb2))
2266                                 return 1;
2267                 }
2268         }
2269         return 0;
2270 }
2271
2272 /*
2273  * Return 1 if the lock can be granted, 0 otherwise.
2274  * Also detect and resolve conversion deadlocks.
2275  *
2276  * lkb is the lock to be granted
2277  *
2278  * now is 1 if the function is being called in the context of the
2279  * immediate request, it is 0 if called later, after the lock has been
2280  * queued.
2281  *
2282  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2283  * after recovery.
2284  *
2285  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2286  */
2287
2288 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2289                            int recover)
2290 {
2291         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2292
2293         /*
2294          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2295          * a new request for a NL mode lock being blocked.
2296          *
2297          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2298          * request, then it would be granted.  In essence, the use of this flag
2299          * tells the Lock Manager to expedite theis request by not considering
2300          * what may be in the CONVERTING or WAITING queues...  As of this
2301          * writing, the EXPEDITE flag can be used only with new requests for NL
2302          * mode locks.  This flag is not valid for conversion requests.
2303          *
2304          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2305          * conversion or used with a non-NL requested mode.  We also know an
2306          * EXPEDITE request is always granted immediately, so now must always
2307          * be 1.  The full condition to grant an expedite request: (now &&
2308          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2309          * therefore be shortened to just checking the flag.
2310          */
2311
2312         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2313                 return 1;
2314
2315         /*
2316          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2317          * added to the remaining conditions.
2318          */
2319
2320         if (queue_conflict(&r->res_grantqueue, lkb))
2321                 return 0;
2322
2323         /*
2324          * 6-3: By default, a conversion request is immediately granted if the
2325          * requested mode is compatible with the modes of all other granted
2326          * locks
2327          */
2328
2329         if (queue_conflict(&r->res_convertqueue, lkb))
2330                 return 0;
2331
2332         /*
2333          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2334          * locks for a recovered rsb, on which lkb's have been rebuilt.
2335          * The lkb's may have been rebuilt on the queues in a different
2336          * order than they were in on the previous master.  So, granting
2337          * queued conversions in order after recovery doesn't make sense
2338          * since the order hasn't been preserved anyway.  The new order
2339          * could also have created a new "in place" conversion deadlock.
2340          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2341          * After recovery, there would be no granted locks, and possibly
2342          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2343          * recovery, grant conversions without considering order.
2344          */
2345
2346         if (conv && recover)
2347                 return 1;
2348
2349         /*
2350          * 6-5: But the default algorithm for deciding whether to grant or
2351          * queue conversion requests does not by itself guarantee that such
2352          * requests are serviced on a "first come first serve" basis.  This, in
2353          * turn, can lead to a phenomenon known as "indefinate postponement".
2354          *
2355          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2356          * the system service employed to request a lock conversion.  This flag
2357          * forces certain conversion requests to be queued, even if they are
2358          * compatible with the granted modes of other locks on the same
2359          * resource.  Thus, the use of this flag results in conversion requests
2360          * being ordered on a "first come first servce" basis.
2361          *
2362          * DCT: This condition is all about new conversions being able to occur
2363          * "in place" while the lock remains on the granted queue (assuming
2364          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2365          * doesn't _have_ to go onto the convert queue where it's processed in
2366          * order.  The "now" variable is necessary to distinguish converts
2367          * being received and processed for the first time now, because once a
2368          * convert is moved to the conversion queue the condition below applies
2369          * requiring fifo granting.
2370          */
2371
2372         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2373                 return 1;
2374
2375         /*
2376          * Even if the convert is compat with all granted locks,
2377          * QUECVT forces it behind other locks on the convert queue.
2378          */
2379
2380         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2381                 if (list_empty(&r->res_convertqueue))
2382                         return 1;
2383                 else
2384                         return 0;
2385         }
2386
2387         /*
2388          * The NOORDER flag is set to avoid the standard vms rules on grant
2389          * order.
2390          */
2391
2392         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2393                 return 1;
2394
2395         /*
2396          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2397          * granted until all other conversion requests ahead of it are granted
2398          * and/or canceled.
2399          */
2400
2401         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2402                 return 1;
2403
2404         /*
2405          * 6-4: By default, a new request is immediately granted only if all
2406          * three of the following conditions are satisfied when the request is
2407          * issued:
2408          * - The queue of ungranted conversion requests for the resource is
2409          *   empty.
2410          * - The queue of ungranted new requests for the resource is empty.
2411          * - The mode of the new request is compatible with the most
2412          *   restrictive mode of all granted locks on the resource.
2413          */
2414
2415         if (now && !conv && list_empty(&r->res_convertqueue) &&
2416             list_empty(&r->res_waitqueue))
2417                 return 1;
2418
2419         /*
2420          * 6-4: Once a lock request is in the queue of ungranted new requests,
2421          * it cannot be granted until the queue of ungranted conversion
2422          * requests is empty, all ungranted new requests ahead of it are
2423          * granted and/or canceled, and it is compatible with the granted mode
2424          * of the most restrictive lock granted on the resource.
2425          */
2426
2427         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2428             first_in_list(lkb, &r->res_waitqueue))
2429                 return 1;
2430
2431         return 0;
2432 }
2433
2434 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2435                           int recover, int *err)
2436 {
2437         int rv;
2438         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2439         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2440
2441         if (err)
2442                 *err = 0;
2443
2444         rv = _can_be_granted(r, lkb, now, recover);
2445         if (rv)
2446                 goto out;
2447
2448         /*
2449          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2450          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2451          * cancels one of the locks.
2452          */
2453
2454         if (is_convert && can_be_queued(lkb) &&
2455             conversion_deadlock_detect(r, lkb)) {
2456                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2457                         lkb->lkb_grmode = DLM_LOCK_NL;
2458                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2459                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2460                         if (err)
2461                                 *err = -EDEADLK;
2462                         else {
2463                                 log_print("can_be_granted deadlock %x now %d",
2464                                           lkb->lkb_id, now);
2465                                 dlm_dump_rsb(r);
2466                         }
2467                 }
2468                 goto out;
2469         }
2470
2471         /*
2472          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2473          * to grant a request in a mode other than the normal rqmode.  It's a
2474          * simple way to provide a big optimization to applications that can
2475          * use them.
2476          */
2477
2478         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2479                 alt = DLM_LOCK_PR;
2480         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2481                 alt = DLM_LOCK_CW;
2482
2483         if (alt) {
2484                 lkb->lkb_rqmode = alt;
2485                 rv = _can_be_granted(r, lkb, now, 0);
2486                 if (rv)
2487                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2488                 else
2489                         lkb->lkb_rqmode = rqmode;
2490         }
2491  out:
2492         return rv;
2493 }
2494
2495 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2496    for locks pending on the convert list.  Once verified (watch for these
2497    log_prints), we should be able to just call _can_be_granted() and not
2498    bother with the demote/deadlk cases here (and there's no easy way to deal
2499    with a deadlk here, we'd have to generate something like grant_lock with
2500    the deadlk error.) */
2501
2502 /* Returns the highest requested mode of all blocked conversions; sets
2503    cw if there's a blocked conversion to DLM_LOCK_CW. */
2504
2505 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2506                                  unsigned int *count)
2507 {
2508         struct dlm_lkb *lkb, *s;
2509         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2510         int hi, demoted, quit, grant_restart, demote_restart;
2511         int deadlk;
2512
2513         quit = 0;
2514  restart:
2515         grant_restart = 0;
2516         demote_restart = 0;
2517         hi = DLM_LOCK_IV;
2518
2519         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2520                 demoted = is_demoted(lkb);
2521                 deadlk = 0;
2522
2523                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2524                         grant_lock_pending(r, lkb);
2525                         grant_restart = 1;
2526                         if (count)
2527                                 (*count)++;
2528                         continue;
2529                 }
2530
2531                 if (!demoted && is_demoted(lkb)) {
2532                         log_print("WARN: pending demoted %x node %d %s",
2533                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2534                         demote_restart = 1;
2535                         continue;
2536                 }
2537
2538                 if (deadlk) {
2539                         log_print("WARN: pending deadlock %x node %d %s",
2540                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2541                         dlm_dump_rsb(r);
2542                         continue;
2543                 }
2544
2545                 hi = max_t(int, lkb->lkb_rqmode, hi);
2546
2547                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2548                         *cw = 1;
2549         }
2550
2551         if (grant_restart)
2552                 goto restart;
2553         if (demote_restart && !quit) {
2554                 quit = 1;
2555                 goto restart;
2556         }
2557
2558         return max_t(int, high, hi);
2559 }
2560
2561 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2562                               unsigned int *count)
2563 {
2564         struct dlm_lkb *lkb, *s;
2565
2566         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2567                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2568                         grant_lock_pending(r, lkb);
2569                         if (count)
2570                                 (*count)++;
2571                 } else {
2572                         high = max_t(int, lkb->lkb_rqmode, high);
2573                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2574                                 *cw = 1;
2575                 }
2576         }
2577
2578         return high;
2579 }
2580
2581 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2582    on either the convert or waiting queue.
2583    high is the largest rqmode of all locks blocked on the convert or
2584    waiting queue. */
2585
2586 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2587 {
2588         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2589                 if (gr->lkb_highbast < DLM_LOCK_EX)
2590                         return 1;
2591                 return 0;
2592         }
2593
2594         if (gr->lkb_highbast < high &&
2595             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2596                 return 1;
2597         return 0;
2598 }
2599
2600 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2601 {
2602         struct dlm_lkb *lkb, *s;
2603         int high = DLM_LOCK_IV;
2604         int cw = 0;
2605
2606         if (!is_master(r)) {
2607                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2608                 dlm_dump_rsb(r);
2609                 return;
2610         }
2611
2612         high = grant_pending_convert(r, high, &cw, count);
2613         high = grant_pending_wait(r, high, &cw, count);
2614
2615         if (high == DLM_LOCK_IV)
2616                 return;
2617
2618         /*
2619          * If there are locks left on the wait/convert queue then send blocking
2620          * ASTs to granted locks based on the largest requested mode (high)
2621          * found above.
2622          */
2623
2624         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2625                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2626                         if (cw && high == DLM_LOCK_PR &&
2627                             lkb->lkb_grmode == DLM_LOCK_PR)
2628                                 queue_bast(r, lkb, DLM_LOCK_CW);
2629                         else
2630                                 queue_bast(r, lkb, high);
2631                         lkb->lkb_highbast = high;
2632                 }
2633         }
2634 }
2635
2636 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2637 {
2638         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2639             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2640                 if (gr->lkb_highbast < DLM_LOCK_EX)
2641                         return 1;
2642                 return 0;
2643         }
2644
2645         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2646                 return 1;
2647         return 0;
2648 }
2649
2650 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2651                             struct dlm_lkb *lkb)
2652 {
2653         struct dlm_lkb *gr;
2654
2655         list_for_each_entry(gr, head, lkb_statequeue) {
2656                 /* skip self when sending basts to convertqueue */
2657                 if (gr == lkb)
2658                         continue;
2659                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2660                         queue_bast(r, gr, lkb->lkb_rqmode);
2661                         gr->lkb_highbast = lkb->lkb_rqmode;
2662                 }
2663         }
2664 }
2665
2666 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2667 {
2668         send_bast_queue(r, &r->res_grantqueue, lkb);
2669 }
2670
2671 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2672 {
2673         send_bast_queue(r, &r->res_grantqueue, lkb);
2674         send_bast_queue(r, &r->res_convertqueue, lkb);
2675 }
2676
2677 /* set_master(r, lkb) -- set the master nodeid of a resource
2678
2679    The purpose of this function is to set the nodeid field in the given
2680    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2681    known, it can just be copied to the lkb and the function will return
2682    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2683    before it can be copied to the lkb.
2684
2685    When the rsb nodeid is being looked up remotely, the initial lkb
2686    causing the lookup is kept on the ls_waiters list waiting for the
2687    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2688    on the rsb's res_lookup list until the master is verified.
2689
2690    Return values:
2691    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2692    1: the rsb master is not available and the lkb has been placed on
2693       a wait queue
2694 */
2695
2696 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2697 {
2698         int our_nodeid = dlm_our_nodeid();
2699
2700         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2701                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2702                 r->res_first_lkid = lkb->lkb_id;
2703                 lkb->lkb_nodeid = r->res_nodeid;
2704                 return 0;
2705         }
2706
2707         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2708                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2709                 return 1;
2710         }
2711
2712         if (r->res_master_nodeid == our_nodeid) {
2713                 lkb->lkb_nodeid = 0;
2714                 return 0;
2715         }
2716
2717         if (r->res_master_nodeid) {
2718                 lkb->lkb_nodeid = r->res_master_nodeid;
2719                 return 0;
2720         }
2721
2722         if (dlm_dir_nodeid(r) == our_nodeid) {
2723                 /* This is a somewhat unusual case; find_rsb will usually
2724                    have set res_master_nodeid when dir nodeid is local, but
2725                    there are cases where we become the dir node after we've
2726                    past find_rsb and go through _request_lock again.
2727                    confirm_master() or process_lookup_list() needs to be
2728                    called after this. */
2729                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2730                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2731                           r->res_name);
2732                 r->res_master_nodeid = our_nodeid;
2733                 r->res_nodeid = 0;
2734                 lkb->lkb_nodeid = 0;
2735                 return 0;
2736         }
2737
2738         wait_pending_remove(r);
2739
2740         r->res_first_lkid = lkb->lkb_id;
2741         send_lookup(r, lkb);
2742         return 1;
2743 }
2744
2745 static void process_lookup_list(struct dlm_rsb *r)
2746 {
2747         struct dlm_lkb *lkb, *safe;
2748
2749         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2750                 list_del_init(&lkb->lkb_rsb_lookup);
2751                 _request_lock(r, lkb);
2752                 schedule();
2753         }
2754 }
2755
2756 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2757
2758 static void confirm_master(struct dlm_rsb *r, int error)
2759 {
2760         struct dlm_lkb *lkb;
2761
2762         if (!r->res_first_lkid)
2763                 return;
2764
2765         switch (error) {
2766         case 0:
2767         case -EINPROGRESS:
2768                 r->res_first_lkid = 0;
2769                 process_lookup_list(r);
2770                 break;
2771
2772         case -EAGAIN:
2773         case -EBADR:
2774         case -ENOTBLK:
2775                 /* the remote request failed and won't be retried (it was
2776                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2777                    lkb the first_lkid */
2778
2779                 r->res_first_lkid = 0;
2780
2781                 if (!list_empty(&r->res_lookup)) {
2782                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2783                                          lkb_rsb_lookup);
2784                         list_del_init(&lkb->lkb_rsb_lookup);
2785                         r->res_first_lkid = lkb->lkb_id;
2786                         _request_lock(r, lkb);
2787                 }
2788                 break;
2789
2790         default:
2791                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2792         }
2793 }
2794
2795 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2796                          int namelen, unsigned long timeout_cs,
2797                          void (*ast) (void *astparam),
2798                          void *astparam,
2799                          void (*bast) (void *astparam, int mode),
2800                          struct dlm_args *args)
2801 {
2802         int rv = -EINVAL;
2803
2804         /* check for invalid arg usage */
2805
2806         if (mode < 0 || mode > DLM_LOCK_EX)
2807                 goto out;
2808
2809         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2810                 goto out;
2811
2812         if (flags & DLM_LKF_CANCEL)
2813                 goto out;
2814
2815         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2816                 goto out;
2817
2818         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2819                 goto out;
2820
2821         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2822                 goto out;
2823
2824         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2825                 goto out;
2826
2827         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2828                 goto out;
2829
2830         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2831                 goto out;
2832
2833         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2834                 goto out;
2835
2836         if (!ast || !lksb)
2837                 goto out;
2838
2839         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2840                 goto out;
2841
2842         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2843                 goto out;
2844
2845         /* these args will be copied to the lkb in validate_lock_args,
2846            it cannot be done now because when converting locks, fields in
2847            an active lkb cannot be modified before locking the rsb */
2848
2849         args->flags = flags;
2850         args->astfn = ast;
2851         args->astparam = astparam;
2852         args->bastfn = bast;
2853         args->timeout = timeout_cs;
2854         args->mode = mode;
2855         args->lksb = lksb;
2856         rv = 0;
2857  out:
2858         return rv;
2859 }
2860
2861 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2862 {
2863         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2864                       DLM_LKF_FORCEUNLOCK))
2865                 return -EINVAL;
2866
2867         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2868                 return -EINVAL;
2869
2870         args->flags = flags;
2871         args->astparam = astarg;
2872         return 0;
2873 }
2874
2875 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2876                               struct dlm_args *args)
2877 {
2878         int rv = -EINVAL;
2879
2880         if (args->flags & DLM_LKF_CONVERT) {
2881                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2882                         goto out;
2883
2884                 if (args->flags & DLM_LKF_QUECVT &&
2885                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2886                         goto out;
2887
2888                 rv = -EBUSY;
2889                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2890                         goto out;
2891
2892                 if (lkb->lkb_wait_type)
2893                         goto out;
2894
2895                 if (is_overlap(lkb))
2896                         goto out;
2897         }
2898
2899         lkb->lkb_exflags = args->flags;
2900         lkb->lkb_sbflags = 0;
2901         lkb->lkb_astfn = args->astfn;
2902         lkb->lkb_astparam = args->astparam;
2903         lkb->lkb_bastfn = args->bastfn;
2904         lkb->lkb_rqmode = args->mode;
2905         lkb->lkb_lksb = args->lksb;
2906         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2907         lkb->lkb_ownpid = (int) current->pid;
2908         lkb->lkb_timeout_cs = args->timeout;
2909         rv = 0;
2910  out:
2911         if (rv)
2912                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2913                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2914                           lkb->lkb_status, lkb->lkb_wait_type,
2915                           lkb->lkb_resource->res_name);
2916         return rv;
2917 }
2918
2919 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2920    for success */
2921
2922 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2923    because there may be a lookup in progress and it's valid to do
2924    cancel/unlockf on it */
2925
2926 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2927 {
2928         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2929         int rv = -EINVAL;
2930
2931         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2932                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2933                 dlm_print_lkb(lkb);
2934                 goto out;
2935         }
2936
2937         /* an lkb may still exist even though the lock is EOL'ed due to a
2938            cancel, unlock or failed noqueue request; an app can't use these
2939            locks; return same error as if the lkid had not been found at all */
2940
2941         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2942                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2943                 rv = -ENOENT;
2944                 goto out;
2945         }
2946
2947         /* an lkb may be waiting for an rsb lookup to complete where the
2948            lookup was initiated by another lock */
2949
2950         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2951                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2952                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2953                         list_del_init(&lkb->lkb_rsb_lookup);
2954                         queue_cast(lkb->lkb_resource, lkb,
2955                                    args->flags & DLM_LKF_CANCEL ?
2956                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2957                         unhold_lkb(lkb); /* undoes create_lkb() */
2958                 }
2959                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2960                 rv = -EBUSY;
2961                 goto out;
2962         }
2963
2964         /* cancel not allowed with another cancel/unlock in progress */
2965
2966         if (args->flags & DLM_LKF_CANCEL) {
2967                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2968                         goto out;
2969
2970                 if (is_overlap(lkb))
2971                         goto out;
2972
2973                 /* don't let scand try to do a cancel */
2974                 del_timeout(lkb);
2975
2976                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2977                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2978                         rv = -EBUSY;
2979                         goto out;
2980                 }
2981
2982                 /* there's nothing to cancel */
2983                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2984                     !lkb->lkb_wait_type) {
2985                         rv = -EBUSY;
2986                         goto out;
2987                 }
2988
2989                 switch (lkb->lkb_wait_type) {
2990                 case DLM_MSG_LOOKUP:
2991                 case DLM_MSG_REQUEST:
2992                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2993                         rv = -EBUSY;
2994                         goto out;
2995                 case DLM_MSG_UNLOCK:
2996                 case DLM_MSG_CANCEL:
2997                         goto out;
2998                 }
2999                 /* add_to_waiters() will set OVERLAP_CANCEL */
3000                 goto out_ok;
3001         }
3002
3003         /* do we need to allow a force-unlock if there's a normal unlock
3004            already in progress?  in what conditions could the normal unlock
3005            fail such that we'd want to send a force-unlock to be sure? */
3006
3007         if (args->flags & DLM_LKF_FORCEUNLOCK) {
3008                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3009                         goto out;
3010
3011                 if (is_overlap_unlock(lkb))
3012                         goto out;
3013
3014                 /* don't let scand try to do a cancel */
3015                 del_timeout(lkb);
3016
3017                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3018                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3019                         rv = -EBUSY;
3020                         goto out;
3021                 }
3022
3023                 switch (lkb->lkb_wait_type) {
3024                 case DLM_MSG_LOOKUP:
3025                 case DLM_MSG_REQUEST:
3026                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3027                         rv = -EBUSY;
3028                         goto out;
3029                 case DLM_MSG_UNLOCK:
3030                         goto out;
3031                 }
3032                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3033                 goto out_ok;
3034         }
3035
3036         /* normal unlock not allowed if there's any op in progress */
3037         rv = -EBUSY;
3038         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3039                 goto out;
3040
3041  out_ok:
3042         /* an overlapping op shouldn't blow away exflags from other op */
3043         lkb->lkb_exflags |= args->flags;
3044         lkb->lkb_sbflags = 0;
3045         lkb->lkb_astparam = args->astparam;
3046         rv = 0;
3047  out:
3048         if (rv)
3049                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3050                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3051                           args->flags, lkb->lkb_wait_type,
3052                           lkb->lkb_resource->res_name);
3053         return rv;
3054 }
3055
3056 /*
3057  * Four stage 4 varieties:
3058  * do_request(), do_convert(), do_unlock(), do_cancel()
3059  * These are called on the master node for the given lock and
3060  * from the central locking logic.
3061  */
3062
3063 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3064 {
3065         int error = 0;
3066
3067         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3068                 grant_lock(r, lkb);
3069                 queue_cast(r, lkb, 0);
3070                 goto out;
3071         }
3072
3073         if (can_be_queued(lkb)) {
3074                 error = -EINPROGRESS;
3075                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3076                 add_timeout(lkb);
3077                 goto out;
3078         }
3079
3080         error = -EAGAIN;
3081         queue_cast(r, lkb, -EAGAIN);
3082  out:
3083         return error;
3084 }
3085
3086 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3087                                int error)
3088 {
3089         switch (error) {
3090         case -EAGAIN:
3091                 if (force_blocking_asts(lkb))
3092                         send_blocking_asts_all(r, lkb);
3093                 break;
3094         case -EINPROGRESS:
3095                 send_blocking_asts(r, lkb);
3096                 break;
3097         }
3098 }
3099
3100 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3101 {
3102         int error = 0;
3103         int deadlk = 0;
3104
3105         /* changing an existing lock may allow others to be granted */
3106
3107         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3108                 grant_lock(r, lkb);
3109                 queue_cast(r, lkb, 0);
3110                 goto out;
3111         }
3112
3113         /* can_be_granted() detected that this lock would block in a conversion
3114            deadlock, so we leave it on the granted queue and return EDEADLK in
3115            the ast for the convert. */
3116
3117         if (deadlk) {
3118                 /* it's left on the granted queue */
3119                 revert_lock(r, lkb);
3120                 queue_cast(r, lkb, -EDEADLK);
3121                 error = -EDEADLK;
3122                 goto out;
3123         }
3124
3125         /* is_demoted() means the can_be_granted() above set the grmode
3126            to NL, and left us on the granted queue.  This auto-demotion
3127            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3128            now grantable.  We have to try to grant other converting locks
3129            before we try again to grant this one. */
3130
3131         if (is_demoted(lkb)) {
3132                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3133                 if (_can_be_granted(r, lkb, 1, 0)) {
3134                         grant_lock(r, lkb);
3135                         queue_cast(r, lkb, 0);
3136                         goto out;
3137                 }
3138                 /* else fall through and move to convert queue */
3139         }
3140
3141         if (can_be_queued(lkb)) {
3142                 error = -EINPROGRESS;
3143                 del_lkb(r, lkb);
3144                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3145                 add_timeout(lkb);
3146                 goto out;
3147         }
3148
3149         error = -EAGAIN;
3150         queue_cast(r, lkb, -EAGAIN);
3151  out:
3152         return error;
3153 }
3154
3155 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3156                                int error)
3157 {
3158         switch (error) {
3159         case 0:
3160                 grant_pending_locks(r, NULL);
3161                 /* grant_pending_locks also sends basts */
3162                 break;
3163         case -EAGAIN:
3164                 if (force_blocking_asts(lkb))
3165                         send_blocking_asts_all(r, lkb);
3166                 break;
3167         case -EINPROGRESS:
3168                 send_blocking_asts(r, lkb);
3169                 break;
3170         }
3171 }
3172
3173 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3174 {
3175         remove_lock(r, lkb);
3176         queue_cast(r, lkb, -DLM_EUNLOCK);
3177         return -DLM_EUNLOCK;
3178 }
3179
3180 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3181                               int error)
3182 {
3183         grant_pending_locks(r, NULL);
3184 }
3185
3186 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3187
3188 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3189 {
3190         int error;
3191
3192         error = revert_lock(r, lkb);
3193         if (error) {
3194                 queue_cast(r, lkb, -DLM_ECANCEL);
3195                 return -DLM_ECANCEL;
3196         }
3197         return 0;
3198 }
3199
3200 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3201                               int error)
3202 {
3203         if (error)
3204                 grant_pending_locks(r, NULL);
3205 }
3206
3207 /*
3208  * Four stage 3 varieties:
3209  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3210  */
3211
3212 /* add a new lkb to a possibly new rsb, called by requesting process */
3213
3214 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3215 {
3216         int error;
3217
3218         /* set_master: sets lkb nodeid from r */
3219
3220         error = set_master(r, lkb);
3221         if (error < 0)
3222                 goto out;
3223         if (error) {
3224                 error = 0;
3225                 goto out;
3226         }
3227
3228         if (is_remote(r)) {
3229                 /* receive_request() calls do_request() on remote node */
3230                 error = send_request(r, lkb);
3231         } else {
3232                 error = do_request(r, lkb);
3233                 /* for remote locks the request_reply is sent
3234                    between do_request and do_request_effects */
3235                 do_request_effects(r, lkb, error);
3236         }
3237  out:
3238         return error;
3239 }
3240
3241 /* change some property of an existing lkb, e.g. mode */
3242
3243 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3244 {
3245         int error;
3246
3247         if (is_remote(r)) {
3248                 /* receive_convert() calls do_convert() on remote node */
3249                 error = send_convert(r, lkb);
3250         } else {
3251                 error = do_convert(r, lkb);
3252                 /* for remote locks the convert_reply is sent
3253                    between do_convert and do_convert_effects */
3254                 do_convert_effects(r, lkb, error);
3255         }
3256
3257         return error;
3258 }
3259
3260 /* remove an existing lkb from the granted queue */
3261
3262 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3263 {
3264         int error;
3265
3266         if (is_remote(r)) {
3267                 /* receive_unlock() calls do_unlock() on remote node */
3268                 error = send_unlock(r, lkb);
3269         } else {
3270                 error = do_unlock(r, lkb);
3271                 /* for remote locks the unlock_reply is sent
3272                    between do_unlock and do_unlock_effects */
3273                 do_unlock_effects(r, lkb, error);
3274         }
3275
3276         return error;
3277 }
3278
3279 /* remove an existing lkb from the convert or wait queue */
3280
3281 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3282 {
3283         int error;
3284
3285         if (is_remote(r)) {
3286                 /* receive_cancel() calls do_cancel() on remote node */
3287                 error = send_cancel(r, lkb);
3288         } else {
3289                 error = do_cancel(r, lkb);
3290                 /* for remote locks the cancel_reply is sent
3291                    between do_cancel and do_cancel_effects */
3292                 do_cancel_effects(r, lkb, error);
3293         }
3294
3295         return error;
3296 }
3297
3298 /*
3299  * Four stage 2 varieties:
3300  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3301  */
3302
3303 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3304                         int len, struct dlm_args *args)
3305 {
3306         struct dlm_rsb *r;
3307         int error;
3308
3309         error = validate_lock_args(ls, lkb, args);
3310         if (error)
3311                 return error;
3312
3313         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3314         if (error)
3315                 return error;
3316
3317         lock_rsb(r);
3318
3319         attach_lkb(r, lkb);
3320         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3321
3322         error = _request_lock(r, lkb);
3323
3324         unlock_rsb(r);
3325         put_rsb(r);
3326         return error;
3327 }
3328
3329 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3330                         struct dlm_args *args)
3331 {
3332         struct dlm_rsb *r;
3333         int error;
3334
3335         r = lkb->lkb_resource;
3336
3337         hold_rsb(r);
3338         lock_rsb(r);
3339
3340         error = validate_lock_args(ls, lkb, args);
3341         if (error)
3342                 goto out;
3343
3344         error = _convert_lock(r, lkb);
3345  out:
3346         unlock_rsb(r);
3347         put_rsb(r);
3348         return error;
3349 }
3350
3351 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3352                        struct dlm_args *args)
3353 {
3354         struct dlm_rsb *r;
3355         int error;
3356
3357         r = lkb->lkb_resource;
3358
3359         hold_rsb(r);
3360         lock_rsb(r);
3361
3362         error = validate_unlock_args(lkb, args);
3363         if (error)
3364                 goto out;
3365
3366         error = _unlock_lock(r, lkb);
3367  out:
3368         unlock_rsb(r);
3369         put_rsb(r);
3370         return error;
3371 }
3372
3373 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3374                        struct dlm_args *args)
3375 {
3376         struct dlm_rsb *r;
3377         int error;
3378
3379         r = lkb->lkb_resource;
3380
3381         hold_rsb(r);
3382         lock_rsb(r);
3383
3384         error = validate_unlock_args(lkb, args);
3385         if (error)
3386                 goto out;
3387
3388         error = _cancel_lock(r, lkb);
3389  out:
3390         unlock_rsb(r);
3391         put_rsb(r);
3392         return error;
3393 }
3394
3395 /*
3396  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3397  */
3398
3399 int dlm_lock(dlm_lockspace_t *lockspace,
3400              int mode,
3401              struct dlm_lksb *lksb,
3402              uint32_t flags,
3403              void *name,
3404              unsigned int namelen,
3405              uint32_t parent_lkid,
3406              void (*ast) (void *astarg),
3407              void *astarg,
3408              void (*bast) (void *astarg, int mode))
3409 {
3410         struct dlm_ls *ls;
3411         struct dlm_lkb *lkb;
3412         struct dlm_args args;
3413         int error, convert = flags & DLM_LKF_CONVERT;
3414
3415         ls = dlm_find_lockspace_local(lockspace);
3416         if (!ls)
3417                 return -EINVAL;
3418
3419         dlm_lock_recovery(ls);
3420
3421         if (convert)
3422                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3423         else
3424                 error = create_lkb(ls, &lkb);
3425
3426         if (error)
3427                 goto out;
3428
3429         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3430                               astarg, bast, &args);
3431         if (error)
3432                 goto out_put;
3433
3434         if (convert)
3435                 error = convert_lock(ls, lkb, &args);
3436         else
3437                 error = request_lock(ls, lkb, name, namelen, &args);
3438
3439         if (error == -EINPROGRESS)
3440                 error = 0;
3441  out_put:
3442         if (convert || error)
3443                 __put_lkb(ls, lkb);
3444         if (error == -EAGAIN || error == -EDEADLK)
3445                 error = 0;
3446  out:
3447         dlm_unlock_recovery(ls);
3448         dlm_put_lockspace(ls);
3449         return error;
3450 }
3451
3452 int dlm_unlock(dlm_lockspace_t *lockspace,
3453                uint32_t lkid,
3454                uint32_t flags,
3455                struct dlm_lksb *lksb,
3456                void *astarg)
3457 {
3458         struct dlm_ls *ls;
3459         struct dlm_lkb *lkb;
3460         struct dlm_args args;
3461         int error;
3462
3463         ls = dlm_find_lockspace_local(lockspace);
3464         if (!ls)
3465                 return -EINVAL;
3466
3467         dlm_lock_recovery(ls);
3468
3469         error = find_lkb(ls, lkid, &lkb);
3470         if (error)
3471                 goto out;
3472
3473         error = set_unlock_args(flags, astarg, &args);
3474         if (error)
3475                 goto out_put;
3476
3477         if (flags & DLM_LKF_CANCEL)
3478                 error = cancel_lock(ls, lkb, &args);
3479         else
3480                 error = unlock_lock(ls, lkb, &args);
3481
3482         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3483                 error = 0;
3484         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3485                 error = 0;
3486  out_put:
3487         dlm_put_lkb(lkb);
3488  out:
3489         dlm_unlock_recovery(ls);
3490         dlm_put_lockspace(ls);
3491         return error;
3492 }
3493
3494 /*
3495  * send/receive routines for remote operations and replies
3496  *
3497  * send_args
3498  * send_common
3499  * send_request                 receive_request
3500  * send_convert                 receive_convert
3501  * send_unlock                  receive_unlock
3502  * send_cancel                  receive_cancel
3503  * send_grant                   receive_grant
3504  * send_bast                    receive_bast
3505  * send_lookup                  receive_lookup
3506  * send_remove                  receive_remove
3507  *
3508  *                              send_common_reply
3509  * receive_request_reply        send_request_reply
3510  * receive_convert_reply        send_convert_reply
3511  * receive_unlock_reply         send_unlock_reply
3512  * receive_cancel_reply         send_cancel_reply
3513  * receive_lookup_reply         send_lookup_reply
3514  */
3515
3516 static int _create_message(struct dlm_ls *ls, int mb_len,
3517                            int to_nodeid, int mstype,
3518                            struct dlm_message **ms_ret,
3519                            struct dlm_mhandle **mh_ret)
3520 {
3521         struct dlm_message *ms;
3522         struct dlm_mhandle *mh;
3523         char *mb;
3524
3525         /* get_buffer gives us a message handle (mh) that we need to
3526            pass into lowcomms_commit and a message buffer (mb) that we
3527            write our data into */
3528
3529         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3530         if (!mh)
3531                 return -ENOBUFS;
3532
3533         memset(mb, 0, mb_len);
3534
3535         ms = (struct dlm_message *) mb;
3536
3537         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3538         ms->m_header.h_lockspace = ls->ls_global_id;
3539         ms->m_header.h_nodeid = dlm_our_nodeid();
3540         ms->m_header.h_length = mb_len;
3541         ms->m_header.h_cmd = DLM_MSG;
3542
3543         ms->m_type = mstype;
3544
3545         *mh_ret = mh;
3546         *ms_ret = ms;
3547         return 0;
3548 }
3549
3550 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551                           int to_nodeid, int mstype,
3552                           struct dlm_message **ms_ret,
3553                           struct dlm_mhandle **mh_ret)
3554 {
3555         int mb_len = sizeof(struct dlm_message);
3556
3557         switch (mstype) {
3558         case DLM_MSG_REQUEST:
3559         case DLM_MSG_LOOKUP:
3560         case DLM_MSG_REMOVE:
3561                 mb_len += r->res_length;
3562                 break;
3563         case DLM_MSG_CONVERT:
3564         case DLM_MSG_UNLOCK:
3565         case DLM_MSG_REQUEST_REPLY:
3566         case DLM_MSG_CONVERT_REPLY:
3567         case DLM_MSG_GRANT:
3568                 if (lkb && lkb->lkb_lvbptr)
3569                         mb_len += r->res_ls->ls_lvblen;
3570                 break;
3571         }
3572
3573         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3574                                ms_ret, mh_ret);
3575 }
3576
3577 /* further lowcomms enhancements or alternate implementations may make
3578    the return value from this function useful at some point */
3579
3580 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3581 {
3582         dlm_message_out(ms);
3583         dlm_lowcomms_commit_buffer(mh);
3584         return 0;
3585 }
3586
3587 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3588                       struct dlm_message *ms)
3589 {
3590         ms->m_nodeid   = lkb->lkb_nodeid;
3591         ms->m_pid      = lkb->lkb_ownpid;
3592         ms->m_lkid     = lkb->lkb_id;
3593         ms->m_remid    = lkb->lkb_remid;
3594         ms->m_exflags  = lkb->lkb_exflags;
3595         ms->m_sbflags  = lkb->lkb_sbflags;
3596         ms->m_flags    = lkb->lkb_flags;
3597         ms->m_lvbseq   = lkb->lkb_lvbseq;
3598         ms->m_status   = lkb->lkb_status;
3599         ms->m_grmode   = lkb->lkb_grmode;
3600         ms->m_rqmode   = lkb->lkb_rqmode;
3601         ms->m_hash     = r->res_hash;
3602
3603         /* m_result and m_bastmode are set from function args,
3604            not from lkb fields */
3605
3606         if (lkb->lkb_bastfn)
3607                 ms->m_asts |= DLM_CB_BAST;
3608         if (lkb->lkb_astfn)
3609                 ms->m_asts |= DLM_CB_CAST;
3610
3611         /* compare with switch in create_message; send_remove() doesn't
3612            use send_args() */
3613
3614         switch (ms->m_type) {
3615         case DLM_MSG_REQUEST:
3616         case DLM_MSG_LOOKUP:
3617                 memcpy(ms->m_extra, r->res_name, r->res_length);
3618                 break;
3619         case DLM_MSG_CONVERT:
3620         case DLM_MSG_UNLOCK:
3621         case DLM_MSG_REQUEST_REPLY:
3622         case DLM_MSG_CONVERT_REPLY:
3623         case DLM_MSG_GRANT:
3624                 if (!lkb->lkb_lvbptr)
3625                         break;
3626                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3627                 break;
3628         }
3629 }
3630
3631 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3632 {
3633         struct dlm_message *ms;
3634         struct dlm_mhandle *mh;
3635         int to_nodeid, error;
3636
3637         to_nodeid = r->res_nodeid;
3638
3639         error = add_to_waiters(lkb, mstype, to_nodeid);
3640         if (error)
3641                 return error;
3642
3643         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3644         if (error)
3645                 goto fail;
3646
3647         send_args(r, lkb, ms);
3648
3649         error = send_message(mh, ms);
3650         if (error)
3651                 goto fail;
3652         return 0;
3653
3654  fail:
3655         remove_from_waiters(lkb, msg_reply_type(mstype));
3656         return error;
3657 }
3658
3659 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3660 {
3661         return send_common(r, lkb, DLM_MSG_REQUEST);
3662 }
3663
3664 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665 {
3666         int error;
3667
3668         error = send_common(r, lkb, DLM_MSG_CONVERT);
3669
3670         /* down conversions go without a reply from the master */
3671         if (!error && down_conversion(lkb)) {
3672                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3673                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3674                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3675                 r->res_ls->ls_stub_ms.m_result = 0;
3676                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3677         }
3678
3679         return error;
3680 }
3681
3682 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3683    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3684    that the master is still correct. */
3685
3686 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3687 {
3688         return send_common(r, lkb, DLM_MSG_UNLOCK);
3689 }
3690
3691 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3692 {
3693         return send_common(r, lkb, DLM_MSG_CANCEL);
3694 }
3695
3696 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3697 {
3698         struct dlm_message *ms;
3699         struct dlm_mhandle *mh;
3700         int to_nodeid, error;
3701
3702         to_nodeid = lkb->lkb_nodeid;
3703
3704         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3705         if (error)
3706                 goto out;
3707
3708         send_args(r, lkb, ms);
3709
3710         ms->m_result = 0;
3711
3712         error = send_message(mh, ms);
3713  out:
3714         return error;
3715 }
3716
3717 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3718 {
3719         struct dlm_message *ms;
3720         struct dlm_mhandle *mh;
3721         int to_nodeid, error;
3722
3723         to_nodeid = lkb->lkb_nodeid;
3724
3725         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3726         if (error)
3727                 goto out;
3728
3729         send_args(r, lkb, ms);
3730
3731         ms->m_bastmode = mode;
3732
3733         error = send_message(mh, ms);
3734  out:
3735         return error;
3736 }
3737
3738 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3739 {
3740         struct dlm_message *ms;
3741         struct dlm_mhandle *mh;
3742         int to_nodeid, error;
3743
3744         to_nodeid = dlm_dir_nodeid(r);
3745
3746         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3747         if (error)
3748                 return error;
3749
3750         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3751         if (error)
3752                 goto fail;
3753
3754         send_args(r, lkb, ms);
3755
3756         error = send_message(mh, ms);
3757         if (error)
3758                 goto fail;
3759         return 0;
3760
3761  fail:
3762         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3763         return error;
3764 }
3765
3766 static int send_remove(struct dlm_rsb *r)
3767 {
3768         struct dlm_message *ms;
3769         struct dlm_mhandle *mh;
3770         int to_nodeid, error;
3771
3772         to_nodeid = dlm_dir_nodeid(r);
3773
3774         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3775         if (error)
3776                 goto out;
3777
3778         memcpy(ms->m_extra, r->res_name, r->res_length);
3779         ms->m_hash = r->res_hash;
3780
3781         error = send_message(mh, ms);
3782  out:
3783         return error;
3784 }
3785
3786 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3787                              int mstype, int rv)
3788 {
3789         struct dlm_message *ms;
3790         struct dlm_mhandle *mh;
3791         int to_nodeid, error;
3792
3793         to_nodeid = lkb->lkb_nodeid;
3794
3795         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3796         if (error)
3797                 goto out;
3798
3799         send_args(r, lkb, ms);
3800
3801         ms->m_result = rv;
3802
3803         error = send_message(mh, ms);
3804  out:
3805         return error;
3806 }
3807
3808 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3809 {
3810         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3811 }
3812
3813 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3814 {
3815         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3816 }
3817
3818 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3819 {
3820         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3821 }
3822
3823 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824 {
3825         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3826 }
3827
3828 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3829                              int ret_nodeid, int rv)
3830 {
3831         struct dlm_rsb *r = &ls->ls_stub_rsb;
3832         struct dlm_message *ms;
3833         struct dlm_mhandle *mh;
3834         int error, nodeid = ms_in->m_header.h_nodeid;
3835
3836         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3837         if (error)
3838                 goto out;
3839
3840         ms->m_lkid = ms_in->m_lkid;
3841         ms->m_result = rv;
3842         ms->m_nodeid = ret_nodeid;
3843
3844         error = send_message(mh, ms);
3845  out:
3846         return error;
3847 }
3848
3849 /* which args we save from a received message depends heavily on the type
3850    of message, unlike the send side where we can safely send everything about
3851    the lkb for any type of message */
3852
3853 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3854 {
3855         lkb->lkb_exflags = ms->m_exflags;
3856         lkb->lkb_sbflags = ms->m_sbflags;
3857         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3858                          (ms->m_flags & 0x0000FFFF);
3859 }
3860
3861 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3862 {
3863         if (ms->m_flags == DLM_IFL_STUB_MS)
3864                 return;
3865
3866         lkb->lkb_sbflags = ms->m_sbflags;
3867         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3868                          (ms->m_flags & 0x0000FFFF);
3869 }
3870
3871 static int receive_extralen(struct dlm_message *ms)
3872 {
3873         return (ms->m_header.h_length - sizeof(struct dlm_message));
3874 }
3875
3876 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3877                        struct dlm_message *ms)
3878 {
3879         int len;
3880
3881         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3882                 if (!lkb->lkb_lvbptr)
3883                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3884                 if (!lkb->lkb_lvbptr)
3885                         return -ENOMEM;
3886                 len = receive_extralen(ms);
3887                 if (len > DLM_RESNAME_MAXLEN)
3888                         len = DLM_RESNAME_MAXLEN;
3889                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3890         }
3891         return 0;
3892 }
3893
3894 static void fake_bastfn(void *astparam, int mode)
3895 {
3896         log_print("fake_bastfn should not be called");
3897 }
3898
3899 static void fake_astfn(void *astparam)
3900 {
3901         log_print("fake_astfn should not be called");
3902 }
3903
3904 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3905                                 struct dlm_message *ms)
3906 {
3907         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3908         lkb->lkb_ownpid = ms->m_pid;
3909         lkb->lkb_remid = ms->m_lkid;
3910         lkb->lkb_grmode = DLM_LOCK_IV;
3911         lkb->lkb_rqmode = ms->m_rqmode;
3912
3913         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3914         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3915
3916         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3917                 /* lkb was just created so there won't be an lvb yet */
3918                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3919                 if (!lkb->lkb_lvbptr)
3920                         return -ENOMEM;
3921         }
3922
3923         return 0;
3924 }
3925
3926 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3927                                 struct dlm_message *ms)
3928 {
3929         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3930                 return -EBUSY;
3931
3932         if (receive_lvb(ls, lkb, ms))
3933                 return -ENOMEM;
3934
3935         lkb->lkb_rqmode = ms->m_rqmode;
3936         lkb->lkb_lvbseq = ms->m_lvbseq;
3937
3938         return 0;
3939 }
3940
3941 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3942                                struct dlm_message *ms)
3943 {
3944         if (receive_lvb(ls, lkb, ms))
3945                 return -ENOMEM;
3946         return 0;
3947 }
3948
3949 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3950    uses to send a reply and that the remote end uses to process the reply. */
3951
3952 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3953 {
3954         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3955         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3956         lkb->lkb_remid = ms->m_lkid;
3957 }
3958
3959 /* This is called after the rsb is locked so that we can safely inspect
3960    fields in the lkb. */
3961
3962 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3963 {
3964         int from = ms->m_header.h_nodeid;
3965         int error = 0;
3966
3967         switch (ms->m_type) {
3968         case DLM_MSG_CONVERT:
3969         case DLM_MSG_UNLOCK:
3970         case DLM_MSG_CANCEL:
3971                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3972                         error = -EINVAL;
3973                 break;
3974
3975         case DLM_MSG_CONVERT_REPLY:
3976         case DLM_MSG_UNLOCK_REPLY:
3977         case DLM_MSG_CANCEL_REPLY:
3978         case DLM_MSG_GRANT:
3979         case DLM_MSG_BAST:
3980                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3981                         error = -EINVAL;
3982                 break;
3983
3984         case DLM_MSG_REQUEST_REPLY:
3985                 if (!is_process_copy(lkb))
3986                         error = -EINVAL;
3987                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3988                         error = -EINVAL;
3989                 break;
3990
3991         default:
3992                 error = -EINVAL;
3993         }
3994
3995         if (error)
3996                 log_error(lkb->lkb_resource->res_ls,
3997                           "ignore invalid message %d from %d %x %x %x %d",
3998                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3999                           lkb->lkb_flags, lkb->lkb_nodeid);
4000         return error;
4001 }
4002
4003 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4004 {
4005         struct dlm_lkb *lkb;
4006         struct dlm_rsb *r;
4007         int from_nodeid;
4008         int error, namelen;
4009
4010         from_nodeid = ms->m_header.h_nodeid;
4011
4012         error = create_lkb(ls, &lkb);
4013         if (error)
4014                 goto fail;
4015
4016         receive_flags(lkb, ms);
4017         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4018         error = receive_request_args(ls, lkb, ms);
4019         if (error) {
4020                 __put_lkb(ls, lkb);
4021                 goto fail;
4022         }
4023
4024         /* The dir node is the authority on whether we are the master
4025            for this rsb or not, so if the master sends us a request, we should
4026            recreate the rsb if we've destroyed it.   This race happens when we
4027            send a remove message to the dir node at the same time that the dir
4028            node sends us a request for the rsb. */
4029
4030         namelen = receive_extralen(ms);
4031
4032         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4033                          R_RECEIVE_REQUEST, &r);
4034         if (error) {
4035                 __put_lkb(ls, lkb);
4036                 goto fail;
4037         }
4038
4039         lock_rsb(r);
4040
4041         if (r->res_master_nodeid != dlm_our_nodeid()) {
4042                 error = validate_master_nodeid(ls, r, from_nodeid);
4043                 if (error) {
4044                         unlock_rsb(r);
4045                         put_rsb(r);
4046                         __put_lkb(ls, lkb);
4047                         goto fail;
4048                 }
4049         }
4050
4051         attach_lkb(r, lkb);
4052         error = do_request(r, lkb);
4053         send_request_reply(r, lkb, error);
4054         do_request_effects(r, lkb, error);
4055
4056         unlock_rsb(r);
4057         put_rsb(r);
4058
4059         if (error == -EINPROGRESS)
4060                 error = 0;
4061         if (error)
4062                 dlm_put_lkb(lkb);
4063         return 0;
4064
4065  fail:
4066         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4067            and do this receive_request again from process_lookup_list once
4068            we get the lookup reply.  This would avoid a many repeated
4069            ENOTBLK request failures when the lookup reply designating us
4070            as master is delayed. */
4071
4072         /* We could repeatedly return -EBADR here if our send_remove() is
4073            delayed in being sent/arriving/being processed on the dir node.
4074            Another node would repeatedly lookup up the master, and the dir
4075            node would continue returning our nodeid until our send_remove
4076            took effect. */
4077
4078         if (error != -ENOTBLK) {
4079                 log_limit(ls, "receive_request %x from %d %d",
4080                           ms->m_lkid, from_nodeid, error);
4081         }
4082
4083         setup_stub_lkb(ls, ms);
4084         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4085         return error;
4086 }
4087
4088 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4089 {
4090         struct dlm_lkb *lkb;
4091         struct dlm_rsb *r;
4092         int error, reply = 1;
4093
4094         error = find_lkb(ls, ms->m_remid, &lkb);
4095         if (error)
4096                 goto fail;
4097
4098         if (lkb->lkb_remid != ms->m_lkid) {
4099                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4100                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4101                           (unsigned long long)lkb->lkb_recover_seq,
4102                           ms->m_header.h_nodeid, ms->m_lkid);
4103                 error = -ENOENT;
4104                 goto fail;
4105         }
4106
4107         r = lkb->lkb_resource;
4108
4109         hold_rsb(r);
4110         lock_rsb(r);
4111
4112         error = validate_message(lkb, ms);
4113         if (error)
4114                 goto out;
4115
4116         receive_flags(lkb, ms);
4117
4118         error = receive_convert_args(ls, lkb, ms);
4119         if (error) {
4120                 send_convert_reply(r, lkb, error);
4121                 goto out;
4122         }
4123
4124         reply = !down_conversion(lkb);
4125
4126         error = do_convert(r, lkb);
4127         if (reply)
4128                 send_convert_reply(r, lkb, error);
4129         do_convert_effects(r, lkb, error);
4130  out:
4131         unlock_rsb(r);
4132         put_rsb(r);
4133         dlm_put_lkb(lkb);
4134         return 0;
4135
4136  fail:
4137         setup_stub_lkb(ls, ms);
4138         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4139         return error;
4140 }
4141
4142 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4143 {
4144         struct dlm_lkb *lkb;
4145         struct dlm_rsb *r;
4146         int error;
4147
4148         error = find_lkb(ls, ms->m_remid, &lkb);
4149         if (error)
4150                 goto fail;
4151
4152         if (lkb->lkb_remid != ms->m_lkid) {
4153                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4154                           lkb->lkb_id, lkb->lkb_remid,
4155                           ms->m_header.h_nodeid, ms->m_lkid);
4156                 error = -ENOENT;
4157                 goto fail;
4158         }
4159
4160         r = lkb->lkb_resource;
4161
4162         hold_rsb(r);
4163         lock_rsb(r);
4164
4165         error = validate_message(lkb, ms);
4166         if (error)
4167                 goto out;
4168
4169         receive_flags(lkb, ms);
4170
4171         error = receive_unlock_args(ls, lkb, ms);
4172         if (error) {
4173                 send_unlock_reply(r, lkb, error);
4174                 goto out;
4175         }
4176
4177         error = do_unlock(r, lkb);
4178         send_unlock_reply(r, lkb, error);
4179         do_unlock_effects(r, lkb, error);
4180  out:
4181         unlock_rsb(r);
4182         put_rsb(r);
4183         dlm_put_lkb(lkb);
4184         return 0;
4185
4186  fail:
4187         setup_stub_lkb(ls, ms);
4188         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4189         return error;
4190 }
4191
4192 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4193 {
4194         struct dlm_lkb *lkb;
4195         struct dlm_rsb *r;
4196         int error;
4197
4198         error = find_lkb(ls, ms->m_remid, &lkb);
4199         if (error)
4200                 goto fail;
4201
4202         receive_flags(lkb, ms);
4203
4204         r = lkb->lkb_resource;
4205
4206         hold_rsb(r);
4207         lock_rsb(r);
4208
4209         error = validate_message(lkb, ms);
4210         if (error)
4211                 goto out;
4212
4213         error = do_cancel(r, lkb);
4214         send_cancel_reply(r, lkb, error);
4215         do_cancel_effects(r, lkb, error);
4216  out:
4217         unlock_rsb(r);
4218         put_rsb(r);
4219         dlm_put_lkb(lkb);
4220         return 0;
4221
4222  fail:
4223         setup_stub_lkb(ls, ms);
4224         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4225         return error;
4226 }
4227
4228 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4229 {
4230         struct dlm_lkb *lkb;
4231         struct dlm_rsb *r;
4232         int error;
4233
4234         error = find_lkb(ls, ms->m_remid, &lkb);
4235         if (error)
4236                 return error;
4237
4238         r = lkb->lkb_resource;
4239
4240         hold_rsb(r);
4241         lock_rsb(r);
4242
4243         error = validate_message(lkb, ms);
4244         if (error)
4245                 goto out;
4246
4247         receive_flags_reply(lkb, ms);
4248         if (is_altmode(lkb))
4249                 munge_altmode(lkb, ms);
4250         grant_lock_pc(r, lkb, ms);
4251         queue_cast(r, lkb, 0);
4252  out:
4253         unlock_rsb(r);
4254         put_rsb(r);
4255         dlm_put_lkb(lkb);
4256         return 0;
4257 }
4258
4259 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4260 {
4261         struct dlm_lkb *lkb;
4262         struct dlm_rsb *r;
4263         int error;
4264
4265         error = find_lkb(ls, ms->m_remid, &lkb);
4266         if (error)
4267                 return error;
4268
4269         r = lkb->lkb_resource;
4270
4271         hold_rsb(r);
4272         lock_rsb(r);
4273
4274         error = validate_message(lkb, ms);
4275         if (error)
4276                 goto out;
4277
4278         queue_bast(r, lkb, ms->m_bastmode);
4279         lkb->lkb_highbast = ms->m_bastmode;
4280  out:
4281         unlock_rsb(r);
4282         put_rsb(r);
4283         dlm_put_lkb(lkb);
4284         return 0;
4285 }
4286
4287 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4288 {
4289         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4290
4291         from_nodeid = ms->m_header.h_nodeid;
4292         our_nodeid = dlm_our_nodeid();
4293
4294         len = receive_extralen(ms);
4295
4296         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4297                                   &ret_nodeid, NULL);
4298
4299         /* Optimization: we're master so treat lookup as a request */
4300         if (!error && ret_nodeid == our_nodeid) {
4301                 receive_request(ls, ms);
4302                 return;
4303         }
4304         send_lookup_reply(ls, ms, ret_nodeid, error);
4305 }
4306
4307 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4308 {
4309         char name[DLM_RESNAME_MAXLEN+1];
4310         struct dlm_rsb *r;
4311         uint32_t hash, b;
4312         int rv, len, dir_nodeid, from_nodeid;
4313
4314         from_nodeid = ms->m_header.h_nodeid;
4315
4316         len = receive_extralen(ms);
4317
4318         if (len > DLM_RESNAME_MAXLEN) {
4319                 log_error(ls, "receive_remove from %d bad len %d",
4320                           from_nodeid, len);
4321                 return;
4322         }
4323
4324         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4325         if (dir_nodeid != dlm_our_nodeid()) {
4326                 log_error(ls, "receive_remove from %d bad nodeid %d",
4327                           from_nodeid, dir_nodeid);
4328                 return;
4329         }
4330
4331         /* Look for name on rsbtbl.toss, if it's there, kill it.
4332            If it's on rsbtbl.keep, it's being used, and we should ignore this
4333            message.  This is an expected race between the dir node sending a
4334            request to the master node at the same time as the master node sends
4335            a remove to the dir node.  The resolution to that race is for the
4336            dir node to ignore the remove message, and the master node to
4337            recreate the master rsb when it gets a request from the dir node for
4338            an rsb it doesn't have. */
4339
4340         memset(name, 0, sizeof(name));
4341         memcpy(name, ms->m_extra, len);
4342
4343         hash = jhash(name, len, 0);
4344         b = hash & (ls->ls_rsbtbl_size - 1);
4345
4346         spin_lock(&ls->ls_rsbtbl[b].lock);
4347
4348         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4349         if (rv) {
4350                 /* verify the rsb is on keep list per comment above */
4351                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4352                 if (rv) {
4353                         /* should not happen */
4354                         log_error(ls, "receive_remove from %d not found %s",
4355                                   from_nodeid, name);
4356                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4357                         return;
4358                 }
4359                 if (r->res_master_nodeid != from_nodeid) {
4360                         /* should not happen */
4361                         log_error(ls, "receive_remove keep from %d master %d",
4362                                   from_nodeid, r->res_master_nodeid);
4363                         dlm_print_rsb(r);
4364                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4365                         return;
4366                 }
4367
4368                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4369                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4370                           name);
4371                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4372                 return;
4373         }
4374
4375         if (r->res_master_nodeid != from_nodeid) {
4376                 log_error(ls, "receive_remove toss from %d master %d",
4377                           from_nodeid, r->res_master_nodeid);
4378                 dlm_print_rsb(r);
4379                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4380                 return;
4381         }
4382
4383         if (kref_put(&r->res_ref, kill_rsb)) {
4384                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4385                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4386                 dlm_free_rsb(r);
4387         } else {
4388                 log_error(ls, "receive_remove from %d rsb ref error",
4389                           from_nodeid);
4390                 dlm_print_rsb(r);
4391                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4392         }
4393 }
4394
4395 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4396 {
4397         do_purge(ls, ms->m_nodeid, ms->m_pid);
4398 }
4399
4400 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4401 {
4402         struct dlm_lkb *lkb;
4403         struct dlm_rsb *r;
4404         int error, mstype, result;
4405         int from_nodeid = ms->m_header.h_nodeid;
4406
4407         error = find_lkb(ls, ms->m_remid, &lkb);
4408         if (error)
4409                 return error;
4410
4411         r = lkb->lkb_resource;
4412         hold_rsb(r);
4413         lock_rsb(r);
4414
4415         error = validate_message(lkb, ms);
4416         if (error)
4417                 goto out;
4418
4419         mstype = lkb->lkb_wait_type;
4420         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4421         if (error) {
4422                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4423                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4424                 dlm_dump_rsb(r);
4425                 goto out;
4426         }
4427
4428         /* Optimization: the dir node was also the master, so it took our
4429            lookup as a request and sent request reply instead of lookup reply */
4430         if (mstype == DLM_MSG_LOOKUP) {
4431                 r->res_master_nodeid = from_nodeid;
4432                 r->res_nodeid = from_nodeid;
4433                 lkb->lkb_nodeid = from_nodeid;
4434         }
4435
4436         /* this is the value returned from do_request() on the master */
4437         result = ms->m_result;
4438
4439         switch (result) {
4440         case -EAGAIN:
4441                 /* request would block (be queued) on remote master */
4442                 queue_cast(r, lkb, -EAGAIN);
4443                 confirm_master(r, -EAGAIN);
4444                 unhold_lkb(lkb); /* undoes create_lkb() */
4445                 break;
4446
4447         case -EINPROGRESS:
4448         case 0:
4449                 /* request was queued or granted on remote master */
4450                 receive_flags_reply(lkb, ms);
4451                 lkb->lkb_remid = ms->m_lkid;
4452                 if (is_altmode(lkb))
4453                         munge_altmode(lkb, ms);
4454                 if (result) {
4455                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4456                         add_timeout(lkb);
4457                 } else {
4458                         grant_lock_pc(r, lkb, ms);
4459                         queue_cast(r, lkb, 0);
4460                 }
4461                 confirm_master(r, result);
4462                 break;
4463
4464         case -EBADR:
4465         case -ENOTBLK:
4466                 /* find_rsb failed to find rsb or rsb wasn't master */
4467                 log_limit(ls, "receive_request_reply %x from %d %d "
4468                           "master %d dir %d first %x %s", lkb->lkb_id,
4469                           from_nodeid, result, r->res_master_nodeid,
4470                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4471
4472                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4473                     r->res_master_nodeid != dlm_our_nodeid()) {
4474                         /* cause _request_lock->set_master->send_lookup */
4475                         r->res_master_nodeid = 0;
4476                         r->res_nodeid = -1;
4477                         lkb->lkb_nodeid = -1;
4478                 }
4479
4480                 if (is_overlap(lkb)) {
4481                         /* we'll ignore error in cancel/unlock reply */
4482                         queue_cast_overlap(r, lkb);
4483                         confirm_master(r, result);
4484                         unhold_lkb(lkb); /* undoes create_lkb() */
4485                 } else {
4486                         _request_lock(r, lkb);
4487
4488                         if (r->res_master_nodeid == dlm_our_nodeid())
4489                                 confirm_master(r, 0);
4490                 }
4491                 break;
4492
4493         default:
4494                 log_error(ls, "receive_request_reply %x error %d",
4495                           lkb->lkb_id, result);
4496         }
4497
4498         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4499                 log_debug(ls, "receive_request_reply %x result %d unlock",
4500                           lkb->lkb_id, result);
4501                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4502                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4503                 send_unlock(r, lkb);
4504         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4505                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4506                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4507                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4508                 send_cancel(r, lkb);
4509         } else {
4510                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4511                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4512         }
4513  out:
4514         unlock_rsb(r);
4515         put_rsb(r);
4516         dlm_put_lkb(lkb);
4517         return 0;
4518 }
4519
4520 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4521                                     struct dlm_message *ms)
4522 {
4523         /* this is the value returned from do_convert() on the master */
4524         switch (ms->m_result) {
4525         case -EAGAIN:
4526                 /* convert would block (be queued) on remote master */
4527                 queue_cast(r, lkb, -EAGAIN);
4528                 break;
4529
4530         case -EDEADLK:
4531                 receive_flags_reply(lkb, ms);
4532                 revert_lock_pc(r, lkb);
4533                 queue_cast(r, lkb, -EDEADLK);
4534                 break;
4535
4536         case -EINPROGRESS:
4537                 /* convert was queued on remote master */
4538                 receive_flags_reply(lkb, ms);
4539                 if (is_demoted(lkb))
4540                         munge_demoted(lkb);
4541                 del_lkb(r, lkb);
4542                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4543                 add_timeout(lkb);
4544                 break;
4545
4546         case 0:
4547                 /* convert was granted on remote master */
4548                 receive_flags_reply(lkb, ms);
4549                 if (is_demoted(lkb))
4550                         munge_demoted(lkb);
4551                 grant_lock_pc(r, lkb, ms);
4552                 queue_cast(r, lkb, 0);
4553                 break;
4554
4555         default:
4556                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4557                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4558                           ms->m_result);
4559                 dlm_print_rsb(r);
4560                 dlm_print_lkb(lkb);
4561         }
4562 }
4563
4564 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4565 {
4566         struct dlm_rsb *r = lkb->lkb_resource;
4567         int error;
4568
4569         hold_rsb(r);
4570         lock_rsb(r);
4571
4572         error = validate_message(lkb, ms);
4573         if (error)
4574                 goto out;
4575
4576         /* stub reply can happen with waiters_mutex held */
4577         error = remove_from_waiters_ms(lkb, ms);
4578         if (error)
4579                 goto out;
4580
4581         __receive_convert_reply(r, lkb, ms);
4582  out:
4583         unlock_rsb(r);
4584         put_rsb(r);
4585 }
4586
4587 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4588 {
4589         struct dlm_lkb *lkb;
4590         int error;
4591
4592         error = find_lkb(ls, ms->m_remid, &lkb);
4593         if (error)
4594                 return error;
4595
4596         _receive_convert_reply(lkb, ms);
4597         dlm_put_lkb(lkb);
4598         return 0;
4599 }
4600
4601 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4602 {
4603         struct dlm_rsb *r = lkb->lkb_resource;
4604         int error;
4605
4606         hold_rsb(r);
4607         lock_rsb(r);
4608
4609         error = validate_message(lkb, ms);
4610         if (error)
4611                 goto out;
4612
4613         /* stub reply can happen with waiters_mutex held */
4614         error = remove_from_waiters_ms(lkb, ms);
4615         if (error)
4616                 goto out;
4617
4618         /* this is the value returned from do_unlock() on the master */
4619
4620         switch (ms->m_result) {
4621         case -DLM_EUNLOCK:
4622                 receive_flags_reply(lkb, ms);
4623                 remove_lock_pc(r, lkb);
4624                 queue_cast(r, lkb, -DLM_EUNLOCK);
4625                 break;
4626         case -ENOENT:
4627                 break;
4628         default:
4629                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4630                           lkb->lkb_id, ms->m_result);
4631         }
4632  out:
4633         unlock_rsb(r);
4634         put_rsb(r);
4635 }
4636
4637 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4638 {
4639         struct dlm_lkb *lkb;
4640         int error;
4641
4642         error = find_lkb(ls, ms->m_remid, &lkb);
4643         if (error)
4644                 return error;
4645
4646         _receive_unlock_reply(lkb, ms);
4647         dlm_put_lkb(lkb);
4648         return 0;
4649 }
4650
4651 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4652 {
4653         struct dlm_rsb *r = lkb->lkb_resource;
4654         int error;
4655
4656         hold_rsb(r);
4657         lock_rsb(r);
4658
4659         error = validate_message(lkb, ms);
4660         if (error)
4661                 goto out;
4662
4663         /* stub reply can happen with waiters_mutex held */
4664         error = remove_from_waiters_ms(lkb, ms);
4665         if (error)
4666                 goto out;
4667
4668         /* this is the value returned from do_cancel() on the master */
4669
4670         switch (ms->m_result) {
4671         case -DLM_ECANCEL:
4672                 receive_flags_reply(lkb, ms);
4673                 revert_lock_pc(r, lkb);
4674                 queue_cast(r, lkb, -DLM_ECANCEL);
4675                 break;
4676         case 0:
4677                 break;
4678         default:
4679                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4680                           lkb->lkb_id, ms->m_result);
4681         }
4682  out:
4683         unlock_rsb(r);
4684         put_rsb(r);
4685 }
4686
4687 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4688 {
4689         struct dlm_lkb *lkb;
4690         int error;
4691
4692         error = find_lkb(ls, ms->m_remid, &lkb);
4693         if (error)
4694                 return error;
4695
4696         _receive_cancel_reply(lkb, ms);
4697         dlm_put_lkb(lkb);
4698         return 0;
4699 }
4700
4701 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4702 {
4703         struct dlm_lkb *lkb;
4704         struct dlm_rsb *r;
4705         int error, ret_nodeid;
4706         int do_lookup_list = 0;
4707
4708         error = find_lkb(ls, ms->m_lkid, &lkb);
4709         if (error) {
4710                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4711                 return;
4712         }
4713
4714         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4715            FIXME: will a non-zero error ever be returned? */
4716
4717         r = lkb->lkb_resource;
4718         hold_rsb(r);
4719         lock_rsb(r);
4720
4721         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4722         if (error)
4723                 goto out;
4724
4725         ret_nodeid = ms->m_nodeid;
4726
4727         /* We sometimes receive a request from the dir node for this
4728            rsb before we've received the dir node's loookup_reply for it.
4729            The request from the dir node implies we're the master, so we set
4730            ourself as master in receive_request_reply, and verify here that
4731            we are indeed the master. */
4732
4733         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4734                 /* This should never happen */
4735                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4736                           "master %d dir %d our %d first %x %s",
4737                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4738                           r->res_master_nodeid, r->res_dir_nodeid,
4739                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4740         }
4741
4742         if (ret_nodeid == dlm_our_nodeid()) {
4743                 r->res_master_nodeid = ret_nodeid;
4744                 r->res_nodeid = 0;
4745                 do_lookup_list = 1;
4746                 r->res_first_lkid = 0;
4747         } else if (ret_nodeid == -1) {
4748                 /* the remote node doesn't believe it's the dir node */
4749                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4750                           lkb->lkb_id, ms->m_header.h_nodeid);
4751                 r->res_master_nodeid = 0;
4752                 r->res_nodeid = -1;
4753                 lkb->lkb_nodeid = -1;
4754         } else {
4755                 /* set_master() will set lkb_nodeid from r */
4756                 r->res_master_nodeid = ret_nodeid;
4757                 r->res_nodeid = ret_nodeid;
4758         }
4759
4760         if (is_overlap(lkb)) {
4761                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4762                           lkb->lkb_id, lkb->lkb_flags);
4763                 queue_cast_overlap(r, lkb);
4764                 unhold_lkb(lkb); /* undoes create_lkb() */
4765                 goto out_list;
4766         }
4767
4768         _request_lock(r, lkb);
4769
4770  out_list:
4771         if (do_lookup_list)
4772                 process_lookup_list(r);
4773  out:
4774         unlock_rsb(r);
4775         put_rsb(r);
4776         dlm_put_lkb(lkb);
4777 }
4778
4779 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4780                              uint32_t saved_seq)
4781 {
4782         int error = 0, noent = 0;
4783
4784         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4785                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4786                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4787                           ms->m_remid, ms->m_result);
4788                 return;
4789         }
4790
4791         switch (ms->m_type) {
4792
4793         /* messages sent to a master node */
4794
4795         case DLM_MSG_REQUEST:
4796                 error = receive_request(ls, ms);
4797                 break;
4798
4799         case DLM_MSG_CONVERT:
4800                 error = receive_convert(ls, ms);
4801                 break;
4802
4803         case DLM_MSG_UNLOCK:
4804                 error = receive_unlock(ls, ms);
4805                 break;
4806
4807         case DLM_MSG_CANCEL:
4808                 noent = 1;
4809                 error = receive_cancel(ls, ms);
4810                 break;
4811
4812         /* messages sent from a master node (replies to above) */
4813
4814         case DLM_MSG_REQUEST_REPLY:
4815                 error = receive_request_reply(ls, ms);
4816                 break;
4817
4818         case DLM_MSG_CONVERT_REPLY:
4819                 error = receive_convert_reply(ls, ms);
4820                 break;
4821
4822         case DLM_MSG_UNLOCK_REPLY:
4823                 error = receive_unlock_reply(ls, ms);
4824                 break;
4825
4826         case DLM_MSG_CANCEL_REPLY:
4827                 error = receive_cancel_reply(ls, ms);
4828                 break;
4829
4830         /* messages sent from a master node (only two types of async msg) */
4831
4832         case DLM_MSG_GRANT:
4833                 noent = 1;
4834                 error = receive_grant(ls, ms);
4835                 break;
4836
4837         case DLM_MSG_BAST:
4838                 noent = 1;
4839                 error = receive_bast(ls, ms);
4840                 break;
4841
4842         /* messages sent to a dir node */
4843
4844         case DLM_MSG_LOOKUP:
4845                 receive_lookup(ls, ms);
4846                 break;
4847
4848         case DLM_MSG_REMOVE:
4849                 receive_remove(ls, ms);
4850                 break;
4851
4852         /* messages sent from a dir node (remove has no reply) */
4853
4854         case DLM_MSG_LOOKUP_REPLY:
4855                 receive_lookup_reply(ls, ms);
4856                 break;
4857
4858         /* other messages */
4859
4860         case DLM_MSG_PURGE:
4861                 receive_purge(ls, ms);
4862                 break;
4863
4864         default:
4865                 log_error(ls, "unknown message type %d", ms->m_type);
4866         }
4867
4868         /*
4869          * When checking for ENOENT, we're checking the result of
4870          * find_lkb(m_remid):
4871          *
4872          * The lock id referenced in the message wasn't found.  This may
4873          * happen in normal usage for the async messages and cancel, so
4874          * only use log_debug for them.
4875          *
4876          * Some errors are expected and normal.
4877          */
4878
4879         if (error == -ENOENT && noent) {
4880                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4881                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4882                           ms->m_lkid, saved_seq);
4883         } else if (error == -ENOENT) {
4884                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4885                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4886                           ms->m_lkid, saved_seq);
4887
4888                 if (ms->m_type == DLM_MSG_CONVERT)
4889                         dlm_dump_rsb_hash(ls, ms->m_hash);
4890         }
4891
4892         if (error == -EINVAL) {
4893                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4894                           "saved_seq %u",
4895                           ms->m_type, ms->m_header.h_nodeid,
4896                           ms->m_lkid, ms->m_remid, saved_seq);
4897         }
4898 }
4899
4900 /* If the lockspace is in recovery mode (locking stopped), then normal
4901    messages are saved on the requestqueue for processing after recovery is
4902    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4903    messages off the requestqueue before we process new ones. This occurs right
4904    after recovery completes when we transition from saving all messages on
4905    requestqueue, to processing all the saved messages, to processing new
4906    messages as they arrive. */
4907
4908 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4909                                 int nodeid)
4910 {
4911         if (dlm_locking_stopped(ls)) {
4912                 /* If we were a member of this lockspace, left, and rejoined,
4913                    other nodes may still be sending us messages from the
4914                    lockspace generation before we left. */
4915                 if (!ls->ls_generation) {
4916                         log_limit(ls, "receive %d from %d ignore old gen",
4917                                   ms->m_type, nodeid);
4918                         return;
4919                 }
4920
4921                 dlm_add_requestqueue(ls, nodeid, ms);
4922         } else {
4923                 dlm_wait_requestqueue(ls);
4924                 _receive_message(ls, ms, 0);
4925         }
4926 }
4927
4928 /* This is called by dlm_recoverd to process messages that were saved on
4929    the requestqueue. */
4930
4931 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4932                                uint32_t saved_seq)
4933 {
4934         _receive_message(ls, ms, saved_seq);
4935 }
4936
4937 /* This is called by the midcomms layer when something is received for
4938    the lockspace.  It could be either a MSG (normal message sent as part of
4939    standard locking activity) or an RCOM (recovery message sent as part of
4940    lockspace recovery). */
4941
4942 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4943 {
4944         struct dlm_header *hd = &p->header;
4945         struct dlm_ls *ls;
4946         int type = 0;
4947
4948         switch (hd->h_cmd) {
4949         case DLM_MSG:
4950                 dlm_message_in(&p->message);
4951                 type = p->message.m_type;
4952                 break;
4953         case DLM_RCOM:
4954                 dlm_rcom_in(&p->rcom);
4955                 type = p->rcom.rc_type;
4956                 break;
4957         default:
4958                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4959                 return;
4960         }
4961
4962         if (hd->h_nodeid != nodeid) {
4963                 log_print("invalid h_nodeid %d from %d lockspace %x",
4964                           hd->h_nodeid, nodeid, hd->h_lockspace);
4965                 return;
4966         }
4967
4968         ls = dlm_find_lockspace_global(hd->h_lockspace);
4969         if (!ls) {
4970                 if (dlm_config.ci_log_debug) {
4971                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4972                                 "%u from %d cmd %d type %d\n",
4973                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
4974                 }
4975
4976                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4977                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4978                 return;
4979         }
4980
4981         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4982            be inactive (in this ls) before transitioning to recovery mode */
4983
4984         down_read(&ls->ls_recv_active);
4985         if (hd->h_cmd == DLM_MSG)
4986                 dlm_receive_message(ls, &p->message, nodeid);
4987         else
4988                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4989         up_read(&ls->ls_recv_active);
4990
4991         dlm_put_lockspace(ls);
4992 }
4993
4994 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4995                                    struct dlm_message *ms_stub)
4996 {
4997         if (middle_conversion(lkb)) {
4998                 hold_lkb(lkb);
4999                 memset(ms_stub, 0, sizeof(struct dlm_message));
5000                 ms_stub->m_flags = DLM_IFL_STUB_MS;
5001                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5002                 ms_stub->m_result = -EINPROGRESS;
5003                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5004                 _receive_convert_reply(lkb, ms_stub);
5005
5006                 /* Same special case as in receive_rcom_lock_args() */
5007                 lkb->lkb_grmode = DLM_LOCK_IV;
5008                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5009                 unhold_lkb(lkb);
5010
5011         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5012                 lkb->lkb_flags |= DLM_IFL_RESEND;
5013         }
5014
5015         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5016            conversions are async; there's no reply from the remote master */
5017 }
5018
5019 /* A waiting lkb needs recovery if the master node has failed, or
5020    the master node is changing (only when no directory is used) */
5021
5022 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5023                                  int dir_nodeid)
5024 {
5025         if (dlm_no_directory(ls))
5026                 return 1;
5027
5028         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5029                 return 1;
5030
5031         return 0;
5032 }
5033
5034 /* Recovery for locks that are waiting for replies from nodes that are now
5035    gone.  We can just complete unlocks and cancels by faking a reply from the
5036    dead node.  Requests and up-conversions we flag to be resent after
5037    recovery.  Down-conversions can just be completed with a fake reply like
5038    unlocks.  Conversions between PR and CW need special attention. */
5039
5040 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5041 {
5042         struct dlm_lkb *lkb, *safe;
5043         struct dlm_message *ms_stub;
5044         int wait_type, stub_unlock_result, stub_cancel_result;
5045         int dir_nodeid;
5046
5047         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
5048         if (!ms_stub) {
5049                 log_error(ls, "dlm_recover_waiters_pre no mem");
5050                 return;
5051         }
5052
5053         mutex_lock(&ls->ls_waiters_mutex);
5054
5055         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5056
5057                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5058
5059                 /* exclude debug messages about unlocks because there can be so
5060                    many and they aren't very interesting */
5061
5062                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5063                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5064                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5065                                   lkb->lkb_id,
5066                                   lkb->lkb_remid,
5067                                   lkb->lkb_wait_type,
5068                                   lkb->lkb_resource->res_nodeid,
5069                                   lkb->lkb_nodeid,
5070                                   lkb->lkb_wait_nodeid,
5071                                   dir_nodeid);
5072                 }
5073
5074                 /* all outstanding lookups, regardless of destination  will be
5075                    resent after recovery is done */
5076
5077                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5078                         lkb->lkb_flags |= DLM_IFL_RESEND;
5079                         continue;
5080                 }
5081
5082                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5083                         continue;
5084
5085                 wait_type = lkb->lkb_wait_type;
5086                 stub_unlock_result = -DLM_EUNLOCK;
5087                 stub_cancel_result = -DLM_ECANCEL;
5088
5089                 /* Main reply may have been received leaving a zero wait_type,
5090                    but a reply for the overlapping op may not have been
5091                    received.  In that case we need to fake the appropriate
5092                    reply for the overlap op. */
5093
5094                 if (!wait_type) {
5095                         if (is_overlap_cancel(lkb)) {
5096                                 wait_type = DLM_MSG_CANCEL;
5097                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5098                                         stub_cancel_result = 0;
5099                         }
5100                         if (is_overlap_unlock(lkb)) {
5101                                 wait_type = DLM_MSG_UNLOCK;
5102                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5103                                         stub_unlock_result = -ENOENT;
5104                         }
5105
5106                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5107                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
5108                                   stub_cancel_result, stub_unlock_result);
5109                 }
5110
5111                 switch (wait_type) {
5112
5113                 case DLM_MSG_REQUEST:
5114                         lkb->lkb_flags |= DLM_IFL_RESEND;
5115                         break;
5116
5117                 case DLM_MSG_CONVERT:
5118                         recover_convert_waiter(ls, lkb, ms_stub);
5119                         break;
5120
5121                 case DLM_MSG_UNLOCK:
5122                         hold_lkb(lkb);
5123                         memset(ms_stub, 0, sizeof(struct dlm_message));
5124                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5125                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5126                         ms_stub->m_result = stub_unlock_result;
5127                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5128                         _receive_unlock_reply(lkb, ms_stub);
5129                         dlm_put_lkb(lkb);
5130                         break;
5131
5132                 case DLM_MSG_CANCEL:
5133                         hold_lkb(lkb);
5134                         memset(ms_stub, 0, sizeof(struct dlm_message));
5135                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5136                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5137                         ms_stub->m_result = stub_cancel_result;
5138                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5139                         _receive_cancel_reply(lkb, ms_stub);
5140                         dlm_put_lkb(lkb);
5141                         break;
5142
5143                 default:
5144                         log_error(ls, "invalid lkb wait_type %d %d",
5145                                   lkb->lkb_wait_type, wait_type);
5146                 }
5147                 schedule();
5148         }
5149         mutex_unlock(&ls->ls_waiters_mutex);
5150         kfree(ms_stub);
5151 }
5152
5153 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5154 {
5155         struct dlm_lkb *lkb;
5156         int found = 0;
5157
5158         mutex_lock(&ls->ls_waiters_mutex);
5159         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5160                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5161                         hold_lkb(lkb);
5162                         found = 1;
5163                         break;
5164                 }
5165         }
5166         mutex_unlock(&ls->ls_waiters_mutex);
5167
5168         if (!found)
5169                 lkb = NULL;
5170         return lkb;
5171 }
5172
5173 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5174    master or dir-node for r.  Processing the lkb may result in it being placed
5175    back on waiters. */
5176
5177 /* We do this after normal locking has been enabled and any saved messages
5178    (in requestqueue) have been processed.  We should be confident that at
5179    this point we won't get or process a reply to any of these waiting
5180    operations.  But, new ops may be coming in on the rsbs/locks here from
5181    userspace or remotely. */
5182
5183 /* there may have been an overlap unlock/cancel prior to recovery or after
5184    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5185    overlap flag would just have been set and nothing new sent.  we can be
5186    confident here than any replies to either the initial op or overlap ops
5187    prior to recovery have been received. */
5188
5189 int dlm_recover_waiters_post(struct dlm_ls *ls)
5190 {
5191         struct dlm_lkb *lkb;
5192         struct dlm_rsb *r;
5193         int error = 0, mstype, err, oc, ou;
5194
5195         while (1) {
5196                 if (dlm_locking_stopped(ls)) {
5197                         log_debug(ls, "recover_waiters_post aborted");
5198                         error = -EINTR;
5199                         break;
5200                 }
5201
5202                 lkb = find_resend_waiter(ls);
5203                 if (!lkb)
5204                         break;
5205
5206                 r = lkb->lkb_resource;
5207                 hold_rsb(r);
5208                 lock_rsb(r);
5209
5210                 mstype = lkb->lkb_wait_type;
5211                 oc = is_overlap_cancel(lkb);
5212                 ou = is_overlap_unlock(lkb);
5213                 err = 0;
5214
5215                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5216                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5217                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5218                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5219                           dlm_dir_nodeid(r), oc, ou);
5220
5221                 /* At this point we assume that we won't get a reply to any
5222                    previous op or overlap op on this lock.  First, do a big
5223                    remove_from_waiters() for all previous ops. */
5224
5225                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5226                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5227                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5228                 lkb->lkb_wait_type = 0;
5229                 lkb->lkb_wait_count = 0;
5230                 mutex_lock(&ls->ls_waiters_mutex);
5231                 list_del_init(&lkb->lkb_wait_reply);
5232                 mutex_unlock(&ls->ls_waiters_mutex);
5233                 unhold_lkb(lkb); /* for waiters list */
5234
5235                 if (oc || ou) {
5236                         /* do an unlock or cancel instead of resending */
5237                         switch (mstype) {
5238                         case DLM_MSG_LOOKUP:
5239                         case DLM_MSG_REQUEST:
5240                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5241                                                         -DLM_ECANCEL);
5242                                 unhold_lkb(lkb); /* undoes create_lkb() */
5243                                 break;
5244                         case DLM_MSG_CONVERT:
5245                                 if (oc) {
5246                                         queue_cast(r, lkb, -DLM_ECANCEL);
5247                                 } else {
5248                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5249                                         _unlock_lock(r, lkb);
5250                                 }
5251                                 break;
5252                         default:
5253                                 err = 1;
5254                         }
5255                 } else {
5256                         switch (mstype) {
5257                         case DLM_MSG_LOOKUP:
5258                         case DLM_MSG_REQUEST:
5259                                 _request_lock(r, lkb);
5260                                 if (is_master(r))
5261                                         confirm_master(r, 0);
5262                                 break;
5263                         case DLM_MSG_CONVERT:
5264                                 _convert_lock(r, lkb);
5265                                 break;
5266                         default:
5267                                 err = 1;
5268                         }
5269                 }
5270
5271                 if (err) {
5272                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5273                                   "dir_nodeid %d overlap %d %d",
5274                                   lkb->lkb_id, mstype, r->res_nodeid,
5275                                   dlm_dir_nodeid(r), oc, ou);
5276                 }
5277                 unlock_rsb(r);
5278                 put_rsb(r);
5279                 dlm_put_lkb(lkb);
5280         }
5281
5282         return error;
5283 }
5284
5285 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5286                               struct list_head *list)
5287 {
5288         struct dlm_lkb *lkb, *safe;
5289
5290         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5291                 if (!is_master_copy(lkb))
5292                         continue;
5293
5294                 /* don't purge lkbs we've added in recover_master_copy for
5295                    the current recovery seq */
5296
5297                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5298                         continue;
5299
5300                 del_lkb(r, lkb);
5301
5302                 /* this put should free the lkb */
5303                 if (!dlm_put_lkb(lkb))
5304                         log_error(ls, "purged mstcpy lkb not released");
5305         }
5306 }
5307
5308 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5309 {
5310         struct dlm_ls *ls = r->res_ls;
5311
5312         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5313         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5314         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5315 }
5316
5317 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5318                             struct list_head *list,
5319                             int nodeid_gone, unsigned int *count)
5320 {
5321         struct dlm_lkb *lkb, *safe;
5322
5323         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5324                 if (!is_master_copy(lkb))
5325                         continue;
5326
5327                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5328                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5329
5330                         del_lkb(r, lkb);
5331
5332                         /* this put should free the lkb */
5333                         if (!dlm_put_lkb(lkb))
5334                                 log_error(ls, "purged dead lkb not released");
5335
5336                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5337
5338                         (*count)++;
5339                 }
5340         }
5341 }
5342
5343 /* Get rid of locks held by nodes that are gone. */
5344
5345 void dlm_recover_purge(struct dlm_ls *ls)
5346 {
5347         struct dlm_rsb *r;
5348         struct dlm_member *memb;
5349         int nodes_count = 0;
5350         int nodeid_gone = 0;
5351         unsigned int lkb_count = 0;
5352
5353         /* cache one removed nodeid to optimize the common
5354            case of a single node removed */
5355
5356         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5357                 nodes_count++;
5358                 nodeid_gone = memb->nodeid;
5359         }
5360
5361         if (!nodes_count)
5362                 return;
5363
5364         down_write(&ls->ls_root_sem);
5365         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5366                 hold_rsb(r);
5367                 lock_rsb(r);
5368                 if (is_master(r)) {
5369                         purge_dead_list(ls, r, &r->res_grantqueue,
5370                                         nodeid_gone, &lkb_count);
5371                         purge_dead_list(ls, r, &r->res_convertqueue,
5372                                         nodeid_gone, &lkb_count);
5373                         purge_dead_list(ls, r, &r->res_waitqueue,
5374                                         nodeid_gone, &lkb_count);
5375                 }
5376                 unlock_rsb(r);
5377                 unhold_rsb(r);
5378                 cond_resched();
5379         }
5380         up_write(&ls->ls_root_sem);
5381
5382         if (lkb_count)
5383                 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
5384                           lkb_count, nodes_count);
5385 }
5386
5387 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5388 {
5389         struct rb_node *n;
5390         struct dlm_rsb *r;
5391
5392         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5393         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5394                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5395
5396                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5397                         continue;
5398                 if (!is_master(r)) {
5399                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5400                         continue;
5401                 }
5402                 hold_rsb(r);
5403                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5404                 return r;
5405         }
5406         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5407         return NULL;
5408 }
5409
5410 /*
5411  * Attempt to grant locks on resources that we are the master of.
5412  * Locks may have become grantable during recovery because locks
5413  * from departed nodes have been purged (or not rebuilt), allowing
5414  * previously blocked locks to now be granted.  The subset of rsb's
5415  * we are interested in are those with lkb's on either the convert or
5416  * waiting queues.
5417  *
5418  * Simplest would be to go through each master rsb and check for non-empty
5419  * convert or waiting queues, and attempt to grant on those rsbs.
5420  * Checking the queues requires lock_rsb, though, for which we'd need
5421  * to release the rsbtbl lock.  This would make iterating through all
5422  * rsb's very inefficient.  So, we rely on earlier recovery routines
5423  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5424  * locks for.
5425  */
5426
5427 void dlm_recover_grant(struct dlm_ls *ls)
5428 {
5429         struct dlm_rsb *r;
5430         int bucket = 0;
5431         unsigned int count = 0;
5432         unsigned int rsb_count = 0;
5433         unsigned int lkb_count = 0;
5434
5435         while (1) {
5436                 r = find_grant_rsb(ls, bucket);
5437                 if (!r) {
5438                         if (bucket == ls->ls_rsbtbl_size - 1)
5439                                 break;
5440                         bucket++;
5441                         continue;
5442                 }
5443                 rsb_count++;
5444                 count = 0;
5445                 lock_rsb(r);
5446                 /* the RECOVER_GRANT flag is checked in the grant path */
5447                 grant_pending_locks(r, &count);
5448                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5449                 lkb_count += count;
5450                 confirm_master(r, 0);
5451                 unlock_rsb(r);
5452                 put_rsb(r);
5453                 cond_resched();
5454         }
5455
5456         if (lkb_count)
5457                 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
5458                           lkb_count, rsb_count);
5459 }
5460
5461 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5462                                          uint32_t remid)
5463 {
5464         struct dlm_lkb *lkb;
5465
5466         list_for_each_entry(lkb, head, lkb_statequeue) {
5467                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5468                         return lkb;
5469         }
5470         return NULL;
5471 }
5472
5473 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5474                                     uint32_t remid)
5475 {
5476         struct dlm_lkb *lkb;
5477
5478         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5479         if (lkb)
5480                 return lkb;
5481         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5482         if (lkb)
5483                 return lkb;
5484         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5485         if (lkb)
5486                 return lkb;
5487         return NULL;
5488 }
5489
5490 /* needs at least dlm_rcom + rcom_lock */
5491 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5492                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5493 {
5494         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5495
5496         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5497         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5498         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5499         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5500         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5501         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5502         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5503         lkb->lkb_rqmode = rl->rl_rqmode;
5504         lkb->lkb_grmode = rl->rl_grmode;
5505         /* don't set lkb_status because add_lkb wants to itself */
5506
5507         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5508         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5509
5510         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5511                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5512                          sizeof(struct rcom_lock);
5513                 if (lvblen > ls->ls_lvblen)
5514                         return -EINVAL;
5515                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5516                 if (!lkb->lkb_lvbptr)
5517                         return -ENOMEM;
5518                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5519         }
5520
5521         /* Conversions between PR and CW (middle modes) need special handling.
5522            The real granted mode of these converting locks cannot be determined
5523            until all locks have been rebuilt on the rsb (recover_conversion) */
5524
5525         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5526             middle_conversion(lkb)) {
5527                 rl->rl_status = DLM_LKSTS_CONVERT;
5528                 lkb->lkb_grmode = DLM_LOCK_IV;
5529                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5530         }
5531
5532         return 0;
5533 }
5534
5535 /* This lkb may have been recovered in a previous aborted recovery so we need
5536    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5537    If so we just send back a standard reply.  If not, we create a new lkb with
5538    the given values and send back our lkid.  We send back our lkid by sending
5539    back the rcom_lock struct we got but with the remid field filled in. */
5540
5541 /* needs at least dlm_rcom + rcom_lock */
5542 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5543 {
5544         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5545         struct dlm_rsb *r;
5546         struct dlm_lkb *lkb;
5547         uint32_t remid = 0;
5548         int from_nodeid = rc->rc_header.h_nodeid;
5549         int error;
5550
5551         if (rl->rl_parent_lkid) {
5552                 error = -EOPNOTSUPP;
5553                 goto out;
5554         }
5555
5556         remid = le32_to_cpu(rl->rl_lkid);
5557
5558         /* In general we expect the rsb returned to be R_MASTER, but we don't
5559            have to require it.  Recovery of masters on one node can overlap
5560            recovery of locks on another node, so one node can send us MSTCPY
5561            locks before we've made ourselves master of this rsb.  We can still
5562            add new MSTCPY locks that we receive here without any harm; when
5563            we make ourselves master, dlm_recover_masters() won't touch the
5564            MSTCPY locks we've received early. */
5565
5566         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5567                          from_nodeid, R_RECEIVE_RECOVER, &r);
5568         if (error)
5569                 goto out;
5570
5571         lock_rsb(r);
5572
5573         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5574                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5575                           from_nodeid, remid);
5576                 error = -EBADR;
5577                 goto out_unlock;
5578         }
5579
5580         lkb = search_remid(r, from_nodeid, remid);
5581         if (lkb) {
5582                 error = -EEXIST;
5583                 goto out_remid;
5584         }
5585
5586         error = create_lkb(ls, &lkb);
5587         if (error)
5588                 goto out_unlock;
5589
5590         error = receive_rcom_lock_args(ls, lkb, r, rc);
5591         if (error) {
5592                 __put_lkb(ls, lkb);
5593                 goto out_unlock;
5594         }
5595
5596         attach_lkb(r, lkb);
5597         add_lkb(r, lkb, rl->rl_status);
5598         error = 0;
5599         ls->ls_recover_locks_in++;
5600
5601         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5602                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5603
5604  out_remid:
5605         /* this is the new value returned to the lock holder for
5606            saving in its process-copy lkb */
5607         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5608
5609         lkb->lkb_recover_seq = ls->ls_recover_seq;
5610
5611  out_unlock:
5612         unlock_rsb(r);
5613         put_rsb(r);
5614  out:
5615         if (error && error != -EEXIST)
5616                 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
5617                           from_nodeid, remid, error);
5618         rl->rl_result = cpu_to_le32(error);
5619         return error;
5620 }
5621
5622 /* needs at least dlm_rcom + rcom_lock */
5623 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5624 {
5625         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5626         struct dlm_rsb *r;
5627         struct dlm_lkb *lkb;
5628         uint32_t lkid, remid;
5629         int error, result;
5630
5631         lkid = le32_to_cpu(rl->rl_lkid);
5632         remid = le32_to_cpu(rl->rl_remid);
5633         result = le32_to_cpu(rl->rl_result);
5634
5635         error = find_lkb(ls, lkid, &lkb);
5636         if (error) {
5637                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5638                           lkid, rc->rc_header.h_nodeid, remid, result);
5639                 return error;
5640         }
5641
5642         r = lkb->lkb_resource;
5643         hold_rsb(r);
5644         lock_rsb(r);
5645
5646         if (!is_process_copy(lkb)) {
5647                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5648                           lkid, rc->rc_header.h_nodeid, remid, result);
5649                 dlm_dump_rsb(r);
5650                 unlock_rsb(r);
5651                 put_rsb(r);
5652                 dlm_put_lkb(lkb);
5653                 return -EINVAL;
5654         }
5655
5656         switch (result) {
5657         case -EBADR:
5658                 /* There's a chance the new master received our lock before
5659                    dlm_recover_master_reply(), this wouldn't happen if we did
5660                    a barrier between recover_masters and recover_locks. */
5661
5662                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5663                           lkid, rc->rc_header.h_nodeid, remid, result);
5664         
5665                 dlm_send_rcom_lock(r, lkb);
5666                 goto out;
5667         case -EEXIST:
5668         case 0:
5669                 lkb->lkb_remid = remid;
5670                 break;
5671         default:
5672                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5673                           lkid, rc->rc_header.h_nodeid, remid, result);
5674         }
5675
5676         /* an ack for dlm_recover_locks() which waits for replies from
5677            all the locks it sends to new masters */
5678         dlm_recovered_lock(r);
5679  out:
5680         unlock_rsb(r);
5681         put_rsb(r);
5682         dlm_put_lkb(lkb);
5683
5684         return 0;
5685 }
5686
5687 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5688                      int mode, uint32_t flags, void *name, unsigned int namelen,
5689                      unsigned long timeout_cs)
5690 {
5691         struct dlm_lkb *lkb;
5692         struct dlm_args args;
5693         int error;
5694
5695         dlm_lock_recovery(ls);
5696
5697         error = create_lkb(ls, &lkb);
5698         if (error) {
5699                 kfree(ua);
5700                 goto out;
5701         }
5702
5703         if (flags & DLM_LKF_VALBLK) {
5704                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5705                 if (!ua->lksb.sb_lvbptr) {
5706                         kfree(ua);
5707                         __put_lkb(ls, lkb);
5708                         error = -ENOMEM;
5709                         goto out;
5710                 }
5711         }
5712
5713         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5714            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5715            lock and that lkb_astparam is the dlm_user_args structure. */
5716
5717         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5718                               fake_astfn, ua, fake_bastfn, &args);
5719         lkb->lkb_flags |= DLM_IFL_USER;
5720
5721         if (error) {
5722                 __put_lkb(ls, lkb);
5723                 goto out;
5724         }
5725
5726         error = request_lock(ls, lkb, name, namelen, &args);
5727
5728         switch (error) {
5729         case 0:
5730                 break;
5731         case -EINPROGRESS:
5732                 error = 0;
5733                 break;
5734         case -EAGAIN:
5735                 error = 0;
5736                 /* fall through */
5737         default:
5738                 __put_lkb(ls, lkb);
5739                 goto out;
5740         }
5741
5742         /* add this new lkb to the per-process list of locks */
5743         spin_lock(&ua->proc->locks_spin);
5744         hold_lkb(lkb);
5745         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5746         spin_unlock(&ua->proc->locks_spin);
5747  out:
5748         dlm_unlock_recovery(ls);
5749         return error;
5750 }
5751
5752 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5753                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5754                      unsigned long timeout_cs)
5755 {
5756         struct dlm_lkb *lkb;
5757         struct dlm_args args;
5758         struct dlm_user_args *ua;
5759         int error;
5760
5761         dlm_lock_recovery(ls);
5762
5763         error = find_lkb(ls, lkid, &lkb);
5764         if (error)
5765                 goto out;
5766
5767         /* user can change the params on its lock when it converts it, or
5768            add an lvb that didn't exist before */
5769
5770         ua = lkb->lkb_ua;
5771
5772         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5773                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5774                 if (!ua->lksb.sb_lvbptr) {
5775                         error = -ENOMEM;
5776                         goto out_put;
5777                 }
5778         }
5779         if (lvb_in && ua->lksb.sb_lvbptr)
5780                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5781
5782         ua->xid = ua_tmp->xid;
5783         ua->castparam = ua_tmp->castparam;
5784         ua->castaddr = ua_tmp->castaddr;
5785         ua->bastparam = ua_tmp->bastparam;
5786         ua->bastaddr = ua_tmp->bastaddr;
5787         ua->user_lksb = ua_tmp->user_lksb;
5788
5789         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5790                               fake_astfn, ua, fake_bastfn, &args);
5791         if (error)
5792                 goto out_put;
5793
5794         error = convert_lock(ls, lkb, &args);
5795
5796         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5797                 error = 0;
5798  out_put:
5799         dlm_put_lkb(lkb);
5800  out:
5801         dlm_unlock_recovery(ls);
5802         kfree(ua_tmp);
5803         return error;
5804 }
5805
5806 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5807                     uint32_t flags, uint32_t lkid, char *lvb_in)
5808 {
5809         struct dlm_lkb *lkb;
5810         struct dlm_args args;
5811         struct dlm_user_args *ua;
5812         int error;
5813
5814         dlm_lock_recovery(ls);
5815
5816         error = find_lkb(ls, lkid, &lkb);
5817         if (error)
5818                 goto out;
5819
5820         ua = lkb->lkb_ua;
5821
5822         if (lvb_in && ua->lksb.sb_lvbptr)
5823                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5824         if (ua_tmp->castparam)
5825                 ua->castparam = ua_tmp->castparam;
5826         ua->user_lksb = ua_tmp->user_lksb;
5827
5828         error = set_unlock_args(flags, ua, &args);
5829         if (error)
5830                 goto out_put;
5831
5832         error = unlock_lock(ls, lkb, &args);
5833
5834         if (error == -DLM_EUNLOCK)
5835                 error = 0;
5836         /* from validate_unlock_args() */
5837         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5838                 error = 0;
5839         if (error)
5840                 goto out_put;
5841
5842         spin_lock(&ua->proc->locks_spin);
5843         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5844         if (!list_empty(&lkb->lkb_ownqueue))
5845                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5846         spin_unlock(&ua->proc->locks_spin);
5847  out_put:
5848         dlm_put_lkb(lkb);
5849  out:
5850         dlm_unlock_recovery(ls);
5851         kfree(ua_tmp);
5852         return error;
5853 }
5854
5855 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5856                     uint32_t flags, uint32_t lkid)
5857 {
5858         struct dlm_lkb *lkb;
5859         struct dlm_args args;
5860         struct dlm_user_args *ua;
5861         int error;
5862
5863         dlm_lock_recovery(ls);
5864
5865         error = find_lkb(ls, lkid, &lkb);
5866         if (error)
5867                 goto out;
5868
5869         ua = lkb->lkb_ua;
5870         if (ua_tmp->castparam)
5871                 ua->castparam = ua_tmp->castparam;
5872         ua->user_lksb = ua_tmp->user_lksb;
5873
5874         error = set_unlock_args(flags, ua, &args);
5875         if (error)
5876                 goto out_put;
5877
5878         error = cancel_lock(ls, lkb, &args);
5879
5880         if (error == -DLM_ECANCEL)
5881                 error = 0;
5882         /* from validate_unlock_args() */
5883         if (error == -EBUSY)
5884                 error = 0;
5885  out_put:
5886         dlm_put_lkb(lkb);
5887  out:
5888         dlm_unlock_recovery(ls);
5889         kfree(ua_tmp);
5890         return error;
5891 }
5892
5893 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5894 {
5895         struct dlm_lkb *lkb;
5896         struct dlm_args args;
5897         struct dlm_user_args *ua;
5898         struct dlm_rsb *r;
5899         int error;
5900
5901         dlm_lock_recovery(ls);
5902
5903         error = find_lkb(ls, lkid, &lkb);
5904         if (error)
5905                 goto out;
5906
5907         ua = lkb->lkb_ua;
5908
5909         error = set_unlock_args(flags, ua, &args);
5910         if (error)
5911                 goto out_put;
5912
5913         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5914
5915         r = lkb->lkb_resource;
5916         hold_rsb(r);
5917         lock_rsb(r);
5918
5919         error = validate_unlock_args(lkb, &args);
5920         if (error)
5921                 goto out_r;
5922         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5923
5924         error = _cancel_lock(r, lkb);
5925  out_r:
5926         unlock_rsb(r);
5927         put_rsb(r);
5928
5929         if (error == -DLM_ECANCEL)
5930                 error = 0;
5931         /* from validate_unlock_args() */
5932         if (error == -EBUSY)
5933                 error = 0;
5934  out_put:
5935         dlm_put_lkb(lkb);
5936  out:
5937         dlm_unlock_recovery(ls);
5938         return error;
5939 }
5940
5941 /* lkb's that are removed from the waiters list by revert are just left on the
5942    orphans list with the granted orphan locks, to be freed by purge */
5943
5944 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5945 {
5946         struct dlm_args args;
5947         int error;
5948
5949         hold_lkb(lkb);
5950         mutex_lock(&ls->ls_orphans_mutex);
5951         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5952         mutex_unlock(&ls->ls_orphans_mutex);
5953
5954         set_unlock_args(0, lkb->lkb_ua, &args);
5955
5956         error = cancel_lock(ls, lkb, &args);
5957         if (error == -DLM_ECANCEL)
5958                 error = 0;
5959         return error;
5960 }
5961
5962 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5963    Regardless of what rsb queue the lock is on, it's removed and freed. */
5964
5965 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5966 {
5967         struct dlm_args args;
5968         int error;
5969
5970         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5971
5972         error = unlock_lock(ls, lkb, &args);
5973         if (error == -DLM_EUNLOCK)
5974                 error = 0;
5975         return error;
5976 }
5977
5978 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5979    (which does lock_rsb) due to deadlock with receiving a message that does
5980    lock_rsb followed by dlm_user_add_cb() */
5981
5982 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5983                                      struct dlm_user_proc *proc)
5984 {
5985         struct dlm_lkb *lkb = NULL;
5986
5987         mutex_lock(&ls->ls_clear_proc_locks);
5988         if (list_empty(&proc->locks))
5989                 goto out;
5990
5991         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5992         list_del_init(&lkb->lkb_ownqueue);
5993
5994         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5995                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5996         else
5997                 lkb->lkb_flags |= DLM_IFL_DEAD;
5998  out:
5999         mutex_unlock(&ls->ls_clear_proc_locks);
6000         return lkb;
6001 }
6002
6003 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6004    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6005    which we clear here. */
6006
6007 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6008    list, and no more device_writes should add lkb's to proc->locks list; so we
6009    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6010    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6011    them ourself. */
6012
6013 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6014 {
6015         struct dlm_lkb *lkb, *safe;
6016
6017         dlm_lock_recovery(ls);
6018
6019         while (1) {
6020                 lkb = del_proc_lock(ls, proc);
6021                 if (!lkb)
6022                         break;
6023                 del_timeout(lkb);
6024                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6025                         orphan_proc_lock(ls, lkb);
6026                 else
6027                         unlock_proc_lock(ls, lkb);
6028
6029                 /* this removes the reference for the proc->locks list
6030                    added by dlm_user_request, it may result in the lkb
6031                    being freed */
6032
6033                 dlm_put_lkb(lkb);
6034         }
6035
6036         mutex_lock(&ls->ls_clear_proc_locks);
6037
6038         /* in-progress unlocks */
6039         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6040                 list_del_init(&lkb->lkb_ownqueue);
6041                 lkb->lkb_flags |= DLM_IFL_DEAD;
6042                 dlm_put_lkb(lkb);
6043         }
6044
6045         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6046                 memset(&lkb->lkb_callbacks, 0,
6047                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6048                 list_del_init(&lkb->lkb_cb_list);
6049                 dlm_put_lkb(lkb);
6050         }
6051
6052         mutex_unlock(&ls->ls_clear_proc_locks);
6053         dlm_unlock_recovery(ls);
6054 }
6055
6056 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6057 {
6058         struct dlm_lkb *lkb, *safe;
6059
6060         while (1) {
6061                 lkb = NULL;
6062                 spin_lock(&proc->locks_spin);
6063                 if (!list_empty(&proc->locks)) {
6064                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6065                                          lkb_ownqueue);
6066                         list_del_init(&lkb->lkb_ownqueue);
6067                 }
6068                 spin_unlock(&proc->locks_spin);
6069
6070                 if (!lkb)
6071                         break;
6072
6073                 lkb->lkb_flags |= DLM_IFL_DEAD;
6074                 unlock_proc_lock(ls, lkb);
6075                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6076         }
6077
6078         spin_lock(&proc->locks_spin);
6079         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6080                 list_del_init(&lkb->lkb_ownqueue);
6081                 lkb->lkb_flags |= DLM_IFL_DEAD;
6082                 dlm_put_lkb(lkb);
6083         }
6084         spin_unlock(&proc->locks_spin);
6085
6086         spin_lock(&proc->asts_spin);
6087         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6088                 memset(&lkb->lkb_callbacks, 0,
6089                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6090                 list_del_init(&lkb->lkb_cb_list);
6091                 dlm_put_lkb(lkb);
6092         }
6093         spin_unlock(&proc->asts_spin);
6094 }
6095
6096 /* pid of 0 means purge all orphans */
6097
6098 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6099 {
6100         struct dlm_lkb *lkb, *safe;
6101
6102         mutex_lock(&ls->ls_orphans_mutex);
6103         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6104                 if (pid && lkb->lkb_ownpid != pid)
6105                         continue;
6106                 unlock_proc_lock(ls, lkb);
6107                 list_del_init(&lkb->lkb_ownqueue);
6108                 dlm_put_lkb(lkb);
6109         }
6110         mutex_unlock(&ls->ls_orphans_mutex);
6111 }
6112
6113 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6114 {
6115         struct dlm_message *ms;
6116         struct dlm_mhandle *mh;
6117         int error;
6118
6119         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6120                                 DLM_MSG_PURGE, &ms, &mh);
6121         if (error)
6122                 return error;
6123         ms->m_nodeid = nodeid;
6124         ms->m_pid = pid;
6125
6126         return send_message(mh, ms);
6127 }
6128
6129 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6130                    int nodeid, int pid)
6131 {
6132         int error = 0;
6133
6134         if (nodeid != dlm_our_nodeid()) {
6135                 error = send_purge(ls, nodeid, pid);
6136         } else {
6137                 dlm_lock_recovery(ls);
6138                 if (pid == current->pid)
6139                         purge_proc_locks(ls, proc);
6140                 else
6141                         do_purge(ls, nodeid, pid);
6142                 dlm_unlock_recovery(ls);
6143         }
6144         return error;
6145 }
6146