staging/lustre: Remove ns_is_client()
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65
66 /**
67  * list_for_remaining_safe - iterate over the remaining entries in a list
68  *            and safeguard against removal of a list entry.
69  * \param pos   the &struct list_head to use as a loop counter. pos MUST
70  *            have been initialized prior to using it in this macro.
71  * \param n     another &struct list_head to use as temporary storage
72  * \param head  the head for your list.
73  */
74 #define list_for_remaining_safe(pos, n, head) \
75         for (n = pos->next; pos != (head); pos = n, n = pos->next)
76
77 static inline int
78 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
79 {
80         return((new->l_policy_data.l_flock.owner ==
81                 lock->l_policy_data.l_flock.owner) &&
82                (new->l_export == lock->l_export));
83 }
84
85 static inline int
86 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
87 {
88         return((new->l_policy_data.l_flock.start <=
89                 lock->l_policy_data.l_flock.end) &&
90                (new->l_policy_data.l_flock.end >=
91                 lock->l_policy_data.l_flock.start));
92 }
93
94 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
95                                             struct ldlm_lock *lock)
96 {
97         /* For server only */
98         if (req->l_export == NULL)
99                 return;
100
101         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
102
103         req->l_policy_data.l_flock.blocking_owner =
104                 lock->l_policy_data.l_flock.owner;
105         req->l_policy_data.l_flock.blocking_export =
106                 lock->l_export;
107         req->l_policy_data.l_flock.blocking_refs = 0;
108
109         cfs_hash_add(req->l_export->exp_flock_hash,
110                      &req->l_policy_data.l_flock.owner,
111                      &req->l_exp_flock_hash);
112 }
113
114 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
115 {
116         /* For server only */
117         if (req->l_export == NULL)
118                 return;
119
120         check_res_locked(req->l_resource);
121         if (req->l_export->exp_flock_hash != NULL &&
122             !hlist_unhashed(&req->l_exp_flock_hash))
123                 cfs_hash_del(req->l_export->exp_flock_hash,
124                              &req->l_policy_data.l_flock.owner,
125                              &req->l_exp_flock_hash);
126 }
127
128 static inline void
129 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
130 {
131         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
132                    mode, flags);
133
134         /* Safe to not lock here, since it should be empty anyway */
135         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
136
137         list_del_init(&lock->l_res_link);
138         if (flags == LDLM_FL_WAIT_NOREPROC &&
139             !(lock->l_flags & LDLM_FL_FAILED)) {
140                 /* client side - set a flag to prevent sending a CANCEL */
141                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
142
143                 /* when reaching here, it is under lock_res_and_lock(). Thus,
144                    need call the nolock version of ldlm_lock_decref_internal*/
145                 ldlm_lock_decref_internal_nolock(lock, mode);
146         }
147
148         ldlm_lock_destroy_nolock(lock);
149 }
150
151 /**
152  * POSIX locks deadlock detection code.
153  *
154  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
155  * with, we need to iterate through all blocked POSIX locks for this
156  * export and see if there is a deadlock condition arising. (i.e. when
157  * one client holds a lock on something and want a lock on something
158  * else and at the same time another client has the opposite situation).
159  */
160 static int
161 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
162 {
163         struct obd_export *req_exp = req->l_export;
164         struct obd_export *bl_exp = bl_lock->l_export;
165         __u64 req_owner = req->l_policy_data.l_flock.owner;
166         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
167
168         /* For server only */
169         if (req_exp == NULL)
170                 return 0;
171
172         class_export_get(bl_exp);
173         while (1) {
174                 struct obd_export *bl_exp_new;
175                 struct ldlm_lock *lock = NULL;
176                 struct ldlm_flock *flock;
177
178                 if (bl_exp->exp_flock_hash != NULL)
179                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
180                                                &bl_owner);
181                 if (lock == NULL)
182                         break;
183
184                 LASSERT(req != lock);
185                 flock = &lock->l_policy_data.l_flock;
186                 LASSERT(flock->owner == bl_owner);
187                 bl_owner = flock->blocking_owner;
188                 bl_exp_new = class_export_get(flock->blocking_export);
189                 class_export_put(bl_exp);
190
191                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
192                 bl_exp = bl_exp_new;
193
194                 if (bl_owner == req_owner && bl_exp == req_exp) {
195                         class_export_put(bl_exp);
196                         return 1;
197                 }
198         }
199         class_export_put(bl_exp);
200
201         return 0;
202 }
203
204 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
205                                           struct list_head *work_list)
206 {
207         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
208
209         if ((exp_connect_flags(lock->l_export) &
210                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
211                 CERROR(
212                       "deadlock found, but client doesn't support flock canceliation\n");
213         } else {
214                 LASSERT(lock->l_completion_ast);
215                 LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
216                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
217                         LDLM_FL_FLOCK_DEADLOCK;
218                 ldlm_flock_blocking_unlink(lock);
219                 ldlm_resource_unlink_lock(lock);
220                 ldlm_add_ast_work_item(lock, NULL, work_list);
221         }
222 }
223
224 /**
225  * Process a granting attempt for flock lock.
226  * Must be called under ns lock held.
227  *
228  * This function looks for any conflicts for \a lock in the granted or
229  * waiting queues. The lock is granted if no conflicts are found in
230  * either queue.
231  *
232  * It is also responsible for splitting a lock if a portion of the lock
233  * is released.
234  *
235  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
236  *   - blocking ASTs have already been sent
237  *
238  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
239  *   - blocking ASTs have not been sent yet, so list of conflicting locks
240  *     would be collected and ASTs sent.
241  */
242 int
243 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
244                         ldlm_error_t *err, struct list_head *work_list)
245 {
246         struct ldlm_resource *res = req->l_resource;
247         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
248         struct list_head *tmp;
249         struct list_head *ownlocks = NULL;
250         struct ldlm_lock *lock = NULL;
251         struct ldlm_lock *new = req;
252         struct ldlm_lock *new2 = NULL;
253         ldlm_mode_t mode = req->l_req_mode;
254         int added = (mode == LCK_NL);
255         int overlaps = 0;
256         int splitted = 0;
257         const struct ldlm_callback_suite null_cbs = { NULL };
258
259         CDEBUG(D_DLMTRACE,
260                "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
261                *flags, new->l_policy_data.l_flock.owner,
262                new->l_policy_data.l_flock.pid, mode,
263                req->l_policy_data.l_flock.start,
264                req->l_policy_data.l_flock.end);
265
266         *err = ELDLM_OK;
267
268         /* No blocking ASTs are sent to the clients for
269          * Posix file & record locks */
270         req->l_blocking_ast = NULL;
271
272 reprocess:
273         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
274                 /* This loop determines where this processes locks start
275                  * in the resource lr_granted list. */
276                 list_for_each(tmp, &res->lr_granted) {
277                         lock = list_entry(tmp, struct ldlm_lock,
278                                               l_res_link);
279                         if (ldlm_same_flock_owner(lock, req)) {
280                                 ownlocks = tmp;
281                                 break;
282                         }
283                 }
284         } else {
285                 int reprocess_failed = 0;
286
287                 lockmode_verify(mode);
288
289                 /* This loop determines if there are existing locks
290                  * that conflict with the new lock request. */
291                 list_for_each(tmp, &res->lr_granted) {
292                         lock = list_entry(tmp, struct ldlm_lock,
293                                               l_res_link);
294
295                         if (ldlm_same_flock_owner(lock, req)) {
296                                 if (!ownlocks)
297                                         ownlocks = tmp;
298                                 continue;
299                         }
300
301                         /* locks are compatible, overlap doesn't matter */
302                         if (lockmode_compat(lock->l_granted_mode, mode))
303                                 continue;
304
305                         if (!ldlm_flocks_overlap(lock, req))
306                                 continue;
307
308                         if (!first_enq) {
309                                 reprocess_failed = 1;
310                                 if (ldlm_flock_deadlock(req, lock)) {
311                                         ldlm_flock_cancel_on_deadlock(req,
312                                                         work_list);
313                                         return LDLM_ITER_CONTINUE;
314                                 }
315                                 continue;
316                         }
317
318                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
319                                 ldlm_flock_destroy(req, mode, *flags);
320                                 *err = -EAGAIN;
321                                 return LDLM_ITER_STOP;
322                         }
323
324                         if (*flags & LDLM_FL_TEST_LOCK) {
325                                 ldlm_flock_destroy(req, mode, *flags);
326                                 req->l_req_mode = lock->l_granted_mode;
327                                 req->l_policy_data.l_flock.pid =
328                                         lock->l_policy_data.l_flock.pid;
329                                 req->l_policy_data.l_flock.start =
330                                         lock->l_policy_data.l_flock.start;
331                                 req->l_policy_data.l_flock.end =
332                                         lock->l_policy_data.l_flock.end;
333                                 *flags |= LDLM_FL_LOCK_CHANGED;
334                                 return LDLM_ITER_STOP;
335                         }
336
337                         /* add lock to blocking list before deadlock
338                          * check to prevent race */
339                         ldlm_flock_blocking_link(req, lock);
340
341                         if (ldlm_flock_deadlock(req, lock)) {
342                                 ldlm_flock_blocking_unlink(req);
343                                 ldlm_flock_destroy(req, mode, *flags);
344                                 *err = -EDEADLK;
345                                 return LDLM_ITER_STOP;
346                         }
347
348                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
349                         *flags |= LDLM_FL_BLOCK_GRANTED;
350                         return LDLM_ITER_STOP;
351                 }
352                 if (reprocess_failed)
353                         return LDLM_ITER_CONTINUE;
354         }
355
356         if (*flags & LDLM_FL_TEST_LOCK) {
357                 ldlm_flock_destroy(req, mode, *flags);
358                 req->l_req_mode = LCK_NL;
359                 *flags |= LDLM_FL_LOCK_CHANGED;
360                 return LDLM_ITER_STOP;
361         }
362
363         /* In case we had slept on this lock request take it off of the
364          * deadlock detection hash list. */
365         ldlm_flock_blocking_unlink(req);
366
367         /* Scan the locks owned by this process that overlap this request.
368          * We may have to merge or split existing locks. */
369
370         if (!ownlocks)
371                 ownlocks = &res->lr_granted;
372
373         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
374                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
375
376                 if (!ldlm_same_flock_owner(lock, new))
377                         break;
378
379                 if (lock->l_granted_mode == mode) {
380                         /* If the modes are the same then we need to process
381                          * locks that overlap OR adjoin the new lock. The extra
382                          * logic condition is necessary to deal with arithmetic
383                          * overflow and underflow. */
384                         if ((new->l_policy_data.l_flock.start >
385                              (lock->l_policy_data.l_flock.end + 1))
386                             && (lock->l_policy_data.l_flock.end !=
387                                 OBD_OBJECT_EOF))
388                                 continue;
389
390                         if ((new->l_policy_data.l_flock.end <
391                              (lock->l_policy_data.l_flock.start - 1))
392                             && (lock->l_policy_data.l_flock.start != 0))
393                                 break;
394
395                         if (new->l_policy_data.l_flock.start <
396                             lock->l_policy_data.l_flock.start) {
397                                 lock->l_policy_data.l_flock.start =
398                                         new->l_policy_data.l_flock.start;
399                         } else {
400                                 new->l_policy_data.l_flock.start =
401                                         lock->l_policy_data.l_flock.start;
402                         }
403
404                         if (new->l_policy_data.l_flock.end >
405                             lock->l_policy_data.l_flock.end) {
406                                 lock->l_policy_data.l_flock.end =
407                                         new->l_policy_data.l_flock.end;
408                         } else {
409                                 new->l_policy_data.l_flock.end =
410                                         lock->l_policy_data.l_flock.end;
411                         }
412
413                         if (added) {
414                                 ldlm_flock_destroy(lock, mode, *flags);
415                         } else {
416                                 new = lock;
417                                 added = 1;
418                         }
419                         continue;
420                 }
421
422                 if (new->l_policy_data.l_flock.start >
423                     lock->l_policy_data.l_flock.end)
424                         continue;
425
426                 if (new->l_policy_data.l_flock.end <
427                     lock->l_policy_data.l_flock.start)
428                         break;
429
430                 ++overlaps;
431
432                 if (new->l_policy_data.l_flock.start <=
433                     lock->l_policy_data.l_flock.start) {
434                         if (new->l_policy_data.l_flock.end <
435                             lock->l_policy_data.l_flock.end) {
436                                 lock->l_policy_data.l_flock.start =
437                                         new->l_policy_data.l_flock.end + 1;
438                                 break;
439                         }
440                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
441                         continue;
442                 }
443                 if (new->l_policy_data.l_flock.end >=
444                     lock->l_policy_data.l_flock.end) {
445                         lock->l_policy_data.l_flock.end =
446                                 new->l_policy_data.l_flock.start - 1;
447                         continue;
448                 }
449
450                 /* split the existing lock into two locks */
451
452                 /* if this is an F_UNLCK operation then we could avoid
453                  * allocating a new lock and use the req lock passed in
454                  * with the request but this would complicate the reply
455                  * processing since updates to req get reflected in the
456                  * reply. The client side replays the lock request so
457                  * it must see the original lock data in the reply. */
458
459                 /* XXX - if ldlm_lock_new() can sleep we should
460                  * release the lr_lock, allocate the new lock,
461                  * and restart processing this lock. */
462                 if (!new2) {
463                         unlock_res_and_lock(req);
464                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
465                                                 lock->l_granted_mode, &null_cbs,
466                                                 NULL, 0, LVB_T_NONE);
467                         lock_res_and_lock(req);
468                         if (!new2) {
469                                 ldlm_flock_destroy(req, lock->l_granted_mode,
470                                                    *flags);
471                                 *err = -ENOLCK;
472                                 return LDLM_ITER_STOP;
473                         }
474                         goto reprocess;
475                 }
476
477                 splitted = 1;
478
479                 new2->l_granted_mode = lock->l_granted_mode;
480                 new2->l_policy_data.l_flock.pid =
481                         new->l_policy_data.l_flock.pid;
482                 new2->l_policy_data.l_flock.owner =
483                         new->l_policy_data.l_flock.owner;
484                 new2->l_policy_data.l_flock.start =
485                         lock->l_policy_data.l_flock.start;
486                 new2->l_policy_data.l_flock.end =
487                         new->l_policy_data.l_flock.start - 1;
488                 lock->l_policy_data.l_flock.start =
489                         new->l_policy_data.l_flock.end + 1;
490                 new2->l_conn_export = lock->l_conn_export;
491                 if (lock->l_export != NULL) {
492                         new2->l_export = class_export_lock_get(lock->l_export,
493                                                                new2);
494                         if (new2->l_export->exp_lock_hash &&
495                             hlist_unhashed(&new2->l_exp_hash))
496                                 cfs_hash_add(new2->l_export->exp_lock_hash,
497                                              &new2->l_remote_handle,
498                                              &new2->l_exp_hash);
499                 }
500                 if (*flags == LDLM_FL_WAIT_NOREPROC)
501                         ldlm_lock_addref_internal_nolock(new2,
502                                                          lock->l_granted_mode);
503
504                 /* insert new2 at lock */
505                 ldlm_resource_add_lock(res, ownlocks, new2);
506                 LDLM_LOCK_RELEASE(new2);
507                 break;
508         }
509
510         /* if new2 is created but never used, destroy it*/
511         if (splitted == 0 && new2 != NULL)
512                 ldlm_lock_destroy_nolock(new2);
513
514         /* At this point we're granting the lock request. */
515         req->l_granted_mode = req->l_req_mode;
516
517         if (!added) {
518                 list_del_init(&req->l_res_link);
519                 /* insert new lock before ownlocks in list. */
520                 ldlm_resource_add_lock(res, ownlocks, req);
521         }
522
523         if (*flags != LDLM_FL_WAIT_NOREPROC) {
524                 /* The only one possible case for client-side calls flock
525                  * policy function is ldlm_flock_completion_ast inside which
526                  * carries LDLM_FL_WAIT_NOREPROC flag. */
527                 CERROR("Illegal parameter for client-side-only module.\n");
528                 LBUG();
529         }
530
531         /* In case we're reprocessing the requested lock we can't destroy
532          * it until after calling ldlm_add_ast_work_item() above so that laawi()
533          * can bump the reference count on \a req. Otherwise \a req
534          * could be freed before the completion AST can be sent.  */
535         if (added)
536                 ldlm_flock_destroy(req, mode, *flags);
537
538         ldlm_resource_dump(D_INFO, res);
539         return LDLM_ITER_CONTINUE;
540 }
541
542 struct ldlm_flock_wait_data {
543         struct ldlm_lock *fwd_lock;
544         int            fwd_generation;
545 };
546
547 static void
548 ldlm_flock_interrupted_wait(void *data)
549 {
550         struct ldlm_lock *lock;
551
552         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
553
554         /* take lock off the deadlock detection hash list. */
555         lock_res_and_lock(lock);
556         ldlm_flock_blocking_unlink(lock);
557
558         /* client side - set flag to prevent lock from being put on LRU list */
559         lock->l_flags |= LDLM_FL_CBPENDING;
560         unlock_res_and_lock(lock);
561 }
562
563 /**
564  * Flock completion callback function.
565  *
566  * \param lock [in,out]: A lock to be handled
567  * \param flags    [in]: flags
568  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
569  *
570  * \retval 0    : success
571  * \retval <0   : failure
572  */
573 int
574 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
575 {
576         struct file_lock                *getlk = lock->l_ast_data;
577         struct obd_device             *obd;
578         struct obd_import             *imp = NULL;
579         struct ldlm_flock_wait_data     fwd;
580         struct l_wait_info            lwi;
581         ldlm_error_t                err;
582         int                          rc = 0;
583
584         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
585                flags, data, getlk);
586
587         /* Import invalidation. We need to actually release the lock
588          * references being held, so that it can go away. No point in
589          * holding the lock even if app still believes it has it, since
590          * server already dropped it anyway. Only for granted locks too. */
591         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
592             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
593                 if (lock->l_req_mode == lock->l_granted_mode &&
594                     lock->l_granted_mode != LCK_NL &&
595                     data == NULL)
596                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
597
598                 /* Need to wake up the waiter if we were evicted */
599                 wake_up(&lock->l_waitq);
600                 return 0;
601         }
602
603         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
604
605         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
606                        LDLM_FL_BLOCK_CONV))) {
607                 if (data == NULL)
608                         /* mds granted the lock in the reply */
609                         goto granted;
610                 /* CP AST RPC: lock get granted, wake it up */
611                 wake_up(&lock->l_waitq);
612                 return 0;
613         }
614
615         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
616         fwd.fwd_lock = lock;
617         obd = class_exp2obd(lock->l_conn_export);
618
619         /* if this is a local lock, there is no import */
620         if (obd != NULL)
621                 imp = obd->u.cli.cl_import;
622
623         if (imp != NULL) {
624                 spin_lock(&imp->imp_lock);
625                 fwd.fwd_generation = imp->imp_generation;
626                 spin_unlock(&imp->imp_lock);
627         }
628
629         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
630
631         /* Go to sleep until the lock is granted. */
632         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
633
634         if (rc) {
635                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
636                            rc);
637                 return rc;
638         }
639
640 granted:
641         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
642
643         if (lock->l_flags & LDLM_FL_DESTROYED) {
644                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
645                 return 0;
646         }
647
648         if (lock->l_flags & LDLM_FL_FAILED) {
649                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
650                 return -EIO;
651         }
652
653         if (rc) {
654                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
655                            rc);
656                 return rc;
657         }
658
659         LDLM_DEBUG(lock, "client-side enqueue granted");
660
661         lock_res_and_lock(lock);
662
663         /* take lock off the deadlock detection hash list. */
664         ldlm_flock_blocking_unlink(lock);
665
666         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
667         list_del_init(&lock->l_res_link);
668
669         if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
670                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
671                 rc = -EDEADLK;
672         } else if (flags & LDLM_FL_TEST_LOCK) {
673                 /* fcntl(F_GETLK) request */
674                 /* The old mode was saved in getlk->fl_type so that if the mode
675                  * in the lock changes we can decref the appropriate refcount.*/
676                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
677                 switch (lock->l_granted_mode) {
678                 case LCK_PR:
679                         getlk->fl_type = F_RDLCK;
680                         break;
681                 case LCK_PW:
682                         getlk->fl_type = F_WRLCK;
683                         break;
684                 default:
685                         getlk->fl_type = F_UNLCK;
686                 }
687                 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
688                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
689                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
690         } else {
691                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
692
693                 /* We need to reprocess the lock to do merges or splits
694                  * with existing locks owned by this process. */
695                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
696         }
697         unlock_res_and_lock(lock);
698         return rc;
699 }
700 EXPORT_SYMBOL(ldlm_flock_completion_ast);
701
702 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
703                                        ldlm_policy_data_t *lpolicy)
704 {
705         memset(lpolicy, 0, sizeof(*lpolicy));
706         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
707         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
708         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
709         /* Compat code, old clients had no idea about owner field and
710          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
711          * April 2011 */
712         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
713 }
714
715
716 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
717                                        ldlm_policy_data_t *lpolicy)
718 {
719         memset(lpolicy, 0, sizeof(*lpolicy));
720         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
721         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
722         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
723         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
724 }
725
726 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
727                                      ldlm_wire_policy_data_t *wpolicy)
728 {
729         memset(wpolicy, 0, sizeof(*wpolicy));
730         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
731         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
732         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
733         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
734 }