Merge tag 'staging-3.11-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/gregkh...
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include <lustre_dlm.h>
60 #include <obd_support.h>
61 #include <obd_class.h>
62 #include <lustre_lib.h>
63 #include <linux/list.h>
64
65 #include "ldlm_internal.h"
66
67 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
68                             void *data, int flag);
69
70 /**
71  * list_for_remaining_safe - iterate over the remaining entries in a list
72  *            and safeguard against removal of a list entry.
73  * \param pos   the &struct list_head to use as a loop counter. pos MUST
74  *            have been initialized prior to using it in this macro.
75  * \param n     another &struct list_head to use as temporary storage
76  * \param head  the head for your list.
77  */
78 #define list_for_remaining_safe(pos, n, head) \
79         for (n = pos->next; pos != (head); pos = n, n = pos->next)
80
81 static inline int
82 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
83 {
84         return((new->l_policy_data.l_flock.owner ==
85                 lock->l_policy_data.l_flock.owner) &&
86                (new->l_export == lock->l_export));
87 }
88
89 static inline int
90 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
91 {
92         return((new->l_policy_data.l_flock.start <=
93                 lock->l_policy_data.l_flock.end) &&
94                (new->l_policy_data.l_flock.end >=
95                 lock->l_policy_data.l_flock.start));
96 }
97
98 static inline int ldlm_flock_blocking_link(struct ldlm_lock *req,
99                                            struct ldlm_lock *lock)
100 {
101         int rc = 0;
102
103         /* For server only */
104         if (req->l_export == NULL)
105                 return 0;
106
107         if (unlikely(req->l_export->exp_flock_hash == NULL)) {
108                 rc = ldlm_init_flock_export(req->l_export);
109                 if (rc)
110                         goto error;
111         }
112
113         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
114
115         req->l_policy_data.l_flock.blocking_owner =
116                 lock->l_policy_data.l_flock.owner;
117         req->l_policy_data.l_flock.blocking_export =
118                 lock->l_export;
119         req->l_policy_data.l_flock.blocking_refs = 0;
120
121         cfs_hash_add(req->l_export->exp_flock_hash,
122                      &req->l_policy_data.l_flock.owner,
123                      &req->l_exp_flock_hash);
124 error:
125         return rc;
126 }
127
128 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
129 {
130         /* For server only */
131         if (req->l_export == NULL)
132                 return;
133
134         check_res_locked(req->l_resource);
135         if (req->l_export->exp_flock_hash != NULL &&
136             !hlist_unhashed(&req->l_exp_flock_hash))
137                 cfs_hash_del(req->l_export->exp_flock_hash,
138                              &req->l_policy_data.l_flock.owner,
139                              &req->l_exp_flock_hash);
140 }
141
142 static inline void
143 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
144 {
145         ENTRY;
146
147         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
148                    mode, flags);
149
150         /* Safe to not lock here, since it should be empty anyway */
151         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
152
153         list_del_init(&lock->l_res_link);
154         if (flags == LDLM_FL_WAIT_NOREPROC &&
155             !(lock->l_flags & LDLM_FL_FAILED)) {
156                 /* client side - set a flag to prevent sending a CANCEL */
157                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
158
159                 /* when reaching here, it is under lock_res_and_lock(). Thus,
160                    need call the nolock version of ldlm_lock_decref_internal*/
161                 ldlm_lock_decref_internal_nolock(lock, mode);
162         }
163
164         ldlm_lock_destroy_nolock(lock);
165         EXIT;
166 }
167
168 /**
169  * POSIX locks deadlock detection code.
170  *
171  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
172  * with, we need to iterate through all blocked POSIX locks for this
173  * export and see if there is a deadlock condition arising. (i.e. when
174  * one client holds a lock on something and want a lock on something
175  * else and at the same time another client has the opposite situation).
176  */
177 static int
178 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
179 {
180         struct obd_export *req_exp = req->l_export;
181         struct obd_export *bl_exp = bl_lock->l_export;
182         __u64 req_owner = req->l_policy_data.l_flock.owner;
183         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
184
185         /* For server only */
186         if (req_exp == NULL)
187                 return 0;
188
189         class_export_get(bl_exp);
190         while (1) {
191                 struct obd_export *bl_exp_new;
192                 struct ldlm_lock *lock = NULL;
193                 struct ldlm_flock *flock;
194
195                 if (bl_exp->exp_flock_hash != NULL)
196                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
197                                                &bl_owner);
198                 if (lock == NULL)
199                         break;
200
201                 flock = &lock->l_policy_data.l_flock;
202                 LASSERT(flock->owner == bl_owner);
203                 bl_owner = flock->blocking_owner;
204                 bl_exp_new = class_export_get(flock->blocking_export);
205                 class_export_put(bl_exp);
206
207                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
208                 bl_exp = bl_exp_new;
209
210                 if (bl_owner == req_owner && bl_exp == req_exp) {
211                         class_export_put(bl_exp);
212                         return 1;
213                 }
214         }
215         class_export_put(bl_exp);
216
217         return 0;
218 }
219
220 /**
221  * Process a granting attempt for flock lock.
222  * Must be called under ns lock held.
223  *
224  * This function looks for any conflicts for \a lock in the granted or
225  * waiting queues. The lock is granted if no conflicts are found in
226  * either queue.
227  *
228  * It is also responsible for splitting a lock if a portion of the lock
229  * is released.
230  *
231  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
232  *   - blocking ASTs have already been sent
233  *
234  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
235  *   - blocking ASTs have not been sent yet, so list of conflicting locks
236  *     would be collected and ASTs sent.
237  */
238 int
239 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
240                         ldlm_error_t *err, struct list_head *work_list)
241 {
242         struct ldlm_resource *res = req->l_resource;
243         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
244         struct list_head *tmp;
245         struct list_head *ownlocks = NULL;
246         struct ldlm_lock *lock = NULL;
247         struct ldlm_lock *new = req;
248         struct ldlm_lock *new2 = NULL;
249         ldlm_mode_t mode = req->l_req_mode;
250         int local = ns_is_client(ns);
251         int added = (mode == LCK_NL);
252         int overlaps = 0;
253         int splitted = 0;
254         const struct ldlm_callback_suite null_cbs = { NULL };
255         int rc;
256         ENTRY;
257
258         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
259                LPU64" end "LPU64"\n", *flags,
260                new->l_policy_data.l_flock.owner,
261                new->l_policy_data.l_flock.pid, mode,
262                req->l_policy_data.l_flock.start,
263                req->l_policy_data.l_flock.end);
264
265         *err = ELDLM_OK;
266
267         if (local) {
268                 /* No blocking ASTs are sent to the clients for
269                  * Posix file & record locks */
270                 req->l_blocking_ast = NULL;
271         } else {
272                 /* Called on the server for lock cancels. */
273                 req->l_blocking_ast = ldlm_flock_blocking_ast;
274         }
275
276 reprocess:
277         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
278                 /* This loop determines where this processes locks start
279                  * in the resource lr_granted list. */
280                 list_for_each(tmp, &res->lr_granted) {
281                         lock = list_entry(tmp, struct ldlm_lock,
282                                               l_res_link);
283                         if (ldlm_same_flock_owner(lock, req)) {
284                                 ownlocks = tmp;
285                                 break;
286                         }
287                 }
288         } else {
289                 lockmode_verify(mode);
290
291                 /* This loop determines if there are existing locks
292                  * that conflict with the new lock request. */
293                 list_for_each(tmp, &res->lr_granted) {
294                         lock = list_entry(tmp, struct ldlm_lock,
295                                               l_res_link);
296
297                         if (ldlm_same_flock_owner(lock, req)) {
298                                 if (!ownlocks)
299                                         ownlocks = tmp;
300                                 continue;
301                         }
302
303                         /* locks are compatible, overlap doesn't matter */
304                         if (lockmode_compat(lock->l_granted_mode, mode))
305                                 continue;
306
307                         if (!ldlm_flocks_overlap(lock, req))
308                                 continue;
309
310                         if (!first_enq)
311                                 RETURN(LDLM_ITER_CONTINUE);
312
313                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
314                                 ldlm_flock_destroy(req, mode, *flags);
315                                 *err = -EAGAIN;
316                                 RETURN(LDLM_ITER_STOP);
317                         }
318
319                         if (*flags & LDLM_FL_TEST_LOCK) {
320                                 ldlm_flock_destroy(req, mode, *flags);
321                                 req->l_req_mode = lock->l_granted_mode;
322                                 req->l_policy_data.l_flock.pid =
323                                         lock->l_policy_data.l_flock.pid;
324                                 req->l_policy_data.l_flock.start =
325                                         lock->l_policy_data.l_flock.start;
326                                 req->l_policy_data.l_flock.end =
327                                         lock->l_policy_data.l_flock.end;
328                                 *flags |= LDLM_FL_LOCK_CHANGED;
329                                 RETURN(LDLM_ITER_STOP);
330                         }
331
332                         if (ldlm_flock_deadlock(req, lock)) {
333                                 ldlm_flock_destroy(req, mode, *flags);
334                                 *err = -EDEADLK;
335                                 RETURN(LDLM_ITER_STOP);
336                         }
337
338                         rc = ldlm_flock_blocking_link(req, lock);
339                         if (rc) {
340                                 ldlm_flock_destroy(req, mode, *flags);
341                                 *err = rc;
342                                 RETURN(LDLM_ITER_STOP);
343                         }
344                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
345                         *flags |= LDLM_FL_BLOCK_GRANTED;
346                         RETURN(LDLM_ITER_STOP);
347                 }
348         }
349
350         if (*flags & LDLM_FL_TEST_LOCK) {
351                 ldlm_flock_destroy(req, mode, *flags);
352                 req->l_req_mode = LCK_NL;
353                 *flags |= LDLM_FL_LOCK_CHANGED;
354                 RETURN(LDLM_ITER_STOP);
355         }
356
357         /* In case we had slept on this lock request take it off of the
358          * deadlock detection hash list. */
359         ldlm_flock_blocking_unlink(req);
360
361         /* Scan the locks owned by this process that overlap this request.
362          * We may have to merge or split existing locks. */
363
364         if (!ownlocks)
365                 ownlocks = &res->lr_granted;
366
367         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
368                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
369
370                 if (!ldlm_same_flock_owner(lock, new))
371                         break;
372
373                 if (lock->l_granted_mode == mode) {
374                         /* If the modes are the same then we need to process
375                          * locks that overlap OR adjoin the new lock. The extra
376                          * logic condition is necessary to deal with arithmetic
377                          * overflow and underflow. */
378                         if ((new->l_policy_data.l_flock.start >
379                              (lock->l_policy_data.l_flock.end + 1))
380                             && (lock->l_policy_data.l_flock.end !=
381                                 OBD_OBJECT_EOF))
382                                 continue;
383
384                         if ((new->l_policy_data.l_flock.end <
385                              (lock->l_policy_data.l_flock.start - 1))
386                             && (lock->l_policy_data.l_flock.start != 0))
387                                 break;
388
389                         if (new->l_policy_data.l_flock.start <
390                             lock->l_policy_data.l_flock.start) {
391                                 lock->l_policy_data.l_flock.start =
392                                         new->l_policy_data.l_flock.start;
393                         } else {
394                                 new->l_policy_data.l_flock.start =
395                                         lock->l_policy_data.l_flock.start;
396                         }
397
398                         if (new->l_policy_data.l_flock.end >
399                             lock->l_policy_data.l_flock.end) {
400                                 lock->l_policy_data.l_flock.end =
401                                         new->l_policy_data.l_flock.end;
402                         } else {
403                                 new->l_policy_data.l_flock.end =
404                                         lock->l_policy_data.l_flock.end;
405                         }
406
407                         if (added) {
408                                 ldlm_flock_destroy(lock, mode, *flags);
409                         } else {
410                                 new = lock;
411                                 added = 1;
412                         }
413                         continue;
414                 }
415
416                 if (new->l_policy_data.l_flock.start >
417                     lock->l_policy_data.l_flock.end)
418                         continue;
419
420                 if (new->l_policy_data.l_flock.end <
421                     lock->l_policy_data.l_flock.start)
422                         break;
423
424                 ++overlaps;
425
426                 if (new->l_policy_data.l_flock.start <=
427                     lock->l_policy_data.l_flock.start) {
428                         if (new->l_policy_data.l_flock.end <
429                             lock->l_policy_data.l_flock.end) {
430                                 lock->l_policy_data.l_flock.start =
431                                         new->l_policy_data.l_flock.end + 1;
432                                 break;
433                         }
434                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
435                         continue;
436                 }
437                 if (new->l_policy_data.l_flock.end >=
438                     lock->l_policy_data.l_flock.end) {
439                         lock->l_policy_data.l_flock.end =
440                                 new->l_policy_data.l_flock.start - 1;
441                         continue;
442                 }
443
444                 /* split the existing lock into two locks */
445
446                 /* if this is an F_UNLCK operation then we could avoid
447                  * allocating a new lock and use the req lock passed in
448                  * with the request but this would complicate the reply
449                  * processing since updates to req get reflected in the
450                  * reply. The client side replays the lock request so
451                  * it must see the original lock data in the reply. */
452
453                 /* XXX - if ldlm_lock_new() can sleep we should
454                  * release the lr_lock, allocate the new lock,
455                  * and restart processing this lock. */
456                 if (!new2) {
457                         unlock_res_and_lock(req);
458                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
459                                                 lock->l_granted_mode, &null_cbs,
460                                                 NULL, 0, LVB_T_NONE);
461                         lock_res_and_lock(req);
462                         if (!new2) {
463                                 ldlm_flock_destroy(req, lock->l_granted_mode,
464                                                    *flags);
465                                 *err = -ENOLCK;
466                                 RETURN(LDLM_ITER_STOP);
467                         }
468                         goto reprocess;
469                 }
470
471                 splitted = 1;
472
473                 new2->l_granted_mode = lock->l_granted_mode;
474                 new2->l_policy_data.l_flock.pid =
475                         new->l_policy_data.l_flock.pid;
476                 new2->l_policy_data.l_flock.owner =
477                         new->l_policy_data.l_flock.owner;
478                 new2->l_policy_data.l_flock.start =
479                         lock->l_policy_data.l_flock.start;
480                 new2->l_policy_data.l_flock.end =
481                         new->l_policy_data.l_flock.start - 1;
482                 lock->l_policy_data.l_flock.start =
483                         new->l_policy_data.l_flock.end + 1;
484                 new2->l_conn_export = lock->l_conn_export;
485                 if (lock->l_export != NULL) {
486                         new2->l_export = class_export_lock_get(lock->l_export, new2);
487                         if (new2->l_export->exp_lock_hash &&
488                             hlist_unhashed(&new2->l_exp_hash))
489                                 cfs_hash_add(new2->l_export->exp_lock_hash,
490                                              &new2->l_remote_handle,
491                                              &new2->l_exp_hash);
492                 }
493                 if (*flags == LDLM_FL_WAIT_NOREPROC)
494                         ldlm_lock_addref_internal_nolock(new2,
495                                                          lock->l_granted_mode);
496
497                 /* insert new2 at lock */
498                 ldlm_resource_add_lock(res, ownlocks, new2);
499                 LDLM_LOCK_RELEASE(new2);
500                 break;
501         }
502
503         /* if new2 is created but never used, destroy it*/
504         if (splitted == 0 && new2 != NULL)
505                 ldlm_lock_destroy_nolock(new2);
506
507         /* At this point we're granting the lock request. */
508         req->l_granted_mode = req->l_req_mode;
509
510         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
511         if (!added) {
512                 list_del_init(&req->l_res_link);
513                 /* insert new lock before ownlocks in list. */
514                 ldlm_resource_add_lock(res, ownlocks, req);
515         }
516
517         if (*flags != LDLM_FL_WAIT_NOREPROC) {
518                 /* The only one possible case for client-side calls flock
519                  * policy function is ldlm_flock_completion_ast inside which
520                  * carries LDLM_FL_WAIT_NOREPROC flag. */
521                 CERROR("Illegal parameter for client-side-only module.\n");
522                 LBUG();
523         }
524
525         /* In case we're reprocessing the requested lock we can't destroy
526          * it until after calling ldlm_add_ast_work_item() above so that laawi()
527          * can bump the reference count on \a req. Otherwise \a req
528          * could be freed before the completion AST can be sent.  */
529         if (added)
530                 ldlm_flock_destroy(req, mode, *flags);
531
532         ldlm_resource_dump(D_INFO, res);
533         RETURN(LDLM_ITER_CONTINUE);
534 }
535
536 struct ldlm_flock_wait_data {
537         struct ldlm_lock *fwd_lock;
538         int            fwd_generation;
539 };
540
541 static void
542 ldlm_flock_interrupted_wait(void *data)
543 {
544         struct ldlm_lock *lock;
545         ENTRY;
546
547         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
548
549         /* take lock off the deadlock detection hash list. */
550         lock_res_and_lock(lock);
551         ldlm_flock_blocking_unlink(lock);
552
553         /* client side - set flag to prevent lock from being put on LRU list */
554         lock->l_flags |= LDLM_FL_CBPENDING;
555         unlock_res_and_lock(lock);
556
557         EXIT;
558 }
559
560 /**
561  * Flock completion callback function.
562  *
563  * \param lock [in,out]: A lock to be handled
564  * \param flags    [in]: flags
565  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
566  *
567  * \retval 0    : success
568  * \retval <0   : failure
569  */
570 int
571 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
572 {
573         struct file_lock                *getlk = lock->l_ast_data;
574         struct obd_device             *obd;
575         struct obd_import             *imp = NULL;
576         struct ldlm_flock_wait_data     fwd;
577         struct l_wait_info            lwi;
578         ldlm_error_t                err;
579         int                          rc = 0;
580         ENTRY;
581
582         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
583                flags, data, getlk);
584
585         /* Import invalidation. We need to actually release the lock
586          * references being held, so that it can go away. No point in
587          * holding the lock even if app still believes it has it, since
588          * server already dropped it anyway. Only for granted locks too. */
589         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
590             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
591                 if (lock->l_req_mode == lock->l_granted_mode &&
592                     lock->l_granted_mode != LCK_NL &&
593                     NULL == data)
594                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
595
596                 /* Need to wake up the waiter if we were evicted */
597                 wake_up(&lock->l_waitq);
598                 RETURN(0);
599         }
600
601         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
602
603         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
604                        LDLM_FL_BLOCK_CONV))) {
605                 if (NULL == data)
606                         /* mds granted the lock in the reply */
607                         goto granted;
608                 /* CP AST RPC: lock get granted, wake it up */
609                 wake_up(&lock->l_waitq);
610                 RETURN(0);
611         }
612
613         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
614                    "sleeping");
615         fwd.fwd_lock = lock;
616         obd = class_exp2obd(lock->l_conn_export);
617
618         /* if this is a local lock, there is no import */
619         if (NULL != obd)
620                 imp = obd->u.cli.cl_import;
621
622         if (NULL != imp) {
623                 spin_lock(&imp->imp_lock);
624                 fwd.fwd_generation = imp->imp_generation;
625                 spin_unlock(&imp->imp_lock);
626         }
627
628         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
629
630         /* Go to sleep until the lock is granted. */
631         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
632
633         if (rc) {
634                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
635                            rc);
636                 RETURN(rc);
637         }
638
639 granted:
640         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
641
642         if (lock->l_destroyed) {
643                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
644                 RETURN(0);
645         }
646
647         if (lock->l_flags & LDLM_FL_FAILED) {
648                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
649                 RETURN(-EIO);
650         }
651
652         if (rc) {
653                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
654                            rc);
655                 RETURN(rc);
656         }
657
658         LDLM_DEBUG(lock, "client-side enqueue granted");
659
660         lock_res_and_lock(lock);
661
662         /* take lock off the deadlock detection hash list. */
663         ldlm_flock_blocking_unlink(lock);
664
665         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
666         list_del_init(&lock->l_res_link);
667
668         if (flags & LDLM_FL_TEST_LOCK) {
669                 /* fcntl(F_GETLK) request */
670                 /* The old mode was saved in getlk->fl_type so that if the mode
671                  * in the lock changes we can decref the appropriate refcount.*/
672                 ldlm_flock_destroy(lock, flock_type(getlk),
673                                    LDLM_FL_WAIT_NOREPROC);
674                 switch (lock->l_granted_mode) {
675                 case LCK_PR:
676                         flock_set_type(getlk, F_RDLCK);
677                         break;
678                 case LCK_PW:
679                         flock_set_type(getlk, F_WRLCK);
680                         break;
681                 default:
682                         flock_set_type(getlk, F_UNLCK);
683                 }
684                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
685                 flock_set_start(getlk,
686                                 (loff_t)lock->l_policy_data.l_flock.start);
687                 flock_set_end(getlk,
688                               (loff_t)lock->l_policy_data.l_flock.end);
689         } else {
690                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
691
692                 /* We need to reprocess the lock to do merges or splits
693                  * with existing locks owned by this process. */
694                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
695         }
696         unlock_res_and_lock(lock);
697         RETURN(0);
698 }
699 EXPORT_SYMBOL(ldlm_flock_completion_ast);
700
701 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
702                             void *data, int flag)
703 {
704         ENTRY;
705
706         LASSERT(lock);
707         LASSERT(flag == LDLM_CB_CANCELING);
708
709         /* take lock off the deadlock detection hash list. */
710         lock_res_and_lock(lock);
711         ldlm_flock_blocking_unlink(lock);
712         unlock_res_and_lock(lock);
713         RETURN(0);
714 }
715
716 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
717                                        ldlm_policy_data_t *lpolicy)
718 {
719         memset(lpolicy, 0, sizeof(*lpolicy));
720         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
721         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
722         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
723         /* Compat code, old clients had no idea about owner field and
724          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
725          * April 2011 */
726         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
727 }
728
729
730 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
731                                        ldlm_policy_data_t *lpolicy)
732 {
733         memset(lpolicy, 0, sizeof(*lpolicy));
734         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
735         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
736         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
737         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
738 }
739
740 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
741                                      ldlm_wire_policy_data_t *wpolicy)
742 {
743         memset(wpolicy, 0, sizeof(*wpolicy));
744         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
745         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
746         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
747         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
748 }
749
750 /*
751  * Export handle<->flock hash operations.
752  */
753 static unsigned
754 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
755 {
756         return cfs_hash_u64_hash(*(__u64 *)key, mask);
757 }
758
759 static void *
760 ldlm_export_flock_key(struct hlist_node *hnode)
761 {
762         struct ldlm_lock *lock;
763
764         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
765         return &lock->l_policy_data.l_flock.owner;
766 }
767
768 static int
769 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
770 {
771         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
772 }
773
774 static void *
775 ldlm_export_flock_object(struct hlist_node *hnode)
776 {
777         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
778 }
779
780 static void
781 ldlm_export_flock_get(cfs_hash_t *hs, struct hlist_node *hnode)
782 {
783         struct ldlm_lock *lock;
784         struct ldlm_flock *flock;
785
786         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
787         LDLM_LOCK_GET(lock);
788
789         flock = &lock->l_policy_data.l_flock;
790         LASSERT(flock->blocking_export != NULL);
791         class_export_get(flock->blocking_export);
792         flock->blocking_refs++;
793 }
794
795 static void
796 ldlm_export_flock_put(cfs_hash_t *hs, struct hlist_node *hnode)
797 {
798         struct ldlm_lock *lock;
799         struct ldlm_flock *flock;
800
801         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
802         LDLM_LOCK_RELEASE(lock);
803
804         flock = &lock->l_policy_data.l_flock;
805         LASSERT(flock->blocking_export != NULL);
806         class_export_put(flock->blocking_export);
807         if (--flock->blocking_refs == 0) {
808                 flock->blocking_owner = 0;
809                 flock->blocking_export = NULL;
810         }
811 }
812
813 static cfs_hash_ops_t ldlm_export_flock_ops = {
814         .hs_hash        = ldlm_export_flock_hash,
815         .hs_key  = ldlm_export_flock_key,
816         .hs_keycmp      = ldlm_export_flock_keycmp,
817         .hs_object      = ldlm_export_flock_object,
818         .hs_get  = ldlm_export_flock_get,
819         .hs_put  = ldlm_export_flock_put,
820         .hs_put_locked  = ldlm_export_flock_put,
821 };
822
823 int ldlm_init_flock_export(struct obd_export *exp)
824 {
825         exp->exp_flock_hash =
826                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
827                                 HASH_EXP_LOCK_CUR_BITS,
828                                 HASH_EXP_LOCK_MAX_BITS,
829                                 HASH_EXP_LOCK_BKT_BITS, 0,
830                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
831                                 &ldlm_export_flock_ops,
832                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
833         if (!exp->exp_flock_hash)
834                 RETURN(-ENOMEM);
835
836         RETURN(0);
837 }
838 EXPORT_SYMBOL(ldlm_init_flock_export);
839
840 void ldlm_destroy_flock_export(struct obd_export *exp)
841 {
842         ENTRY;
843         if (exp->exp_flock_hash) {
844                 cfs_hash_putref(exp->exp_flock_hash);
845                 exp->exp_flock_hash = NULL;
846         }
847         EXIT;
848 }
849 EXPORT_SYMBOL(ldlm_destroy_flock_export);