staging: lustre: Cleanup variable declarations in mdc_enqueue()
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40
41 #include "../include/linux/lustre_intent.h"
42 #include "../include/obd.h"
43 #include "../include/obd_class.h"
44 #include "../include/lustre_dlm.h"
45 #include "../include/lustre_fid.h"      /* fid_res_name_eq() */
46 #include "../include/lustre_mdc.h"
47 #include "../include/lustre_net.h"
48 #include "../include/lustre_req_layout.h"
49 #include "mdc_internal.h"
50
51 struct mdc_getattr_args {
52         struct obd_export          *ga_exp;
53         struct md_enqueue_info      *ga_minfo;
54         struct ldlm_enqueue_info    *ga_einfo;
55 };
56
57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59         return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62
63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65         it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68
69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71         it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74
75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77         if (it_disposition(it, DISP_OPEN_LEASE)) {
78                 if (phase >= DISP_OPEN_LEASE)
79                         return it->d.lustre.it_status;
80                 else
81                         return 0;
82         }
83         if (it_disposition(it, DISP_OPEN_OPEN)) {
84                 if (phase >= DISP_OPEN_OPEN)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_OPEN_CREATE)) {
91                 if (phase >= DISP_OPEN_CREATE)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98                 if (phase >= DISP_LOOKUP_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103
104         if (it_disposition(it, DISP_IT_EXECD)) {
105                 if (phase >= DISP_IT_EXECD)
106                         return it->d.lustre.it_status;
107                 else
108                         return 0;
109         }
110         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111                it->d.lustre.it_status);
112         LBUG();
113         return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116
117 /* this must be called on a lockh that is known to have a referenced lock */
118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119                       __u64 *bits)
120 {
121         struct ldlm_lock *lock;
122         struct inode *new_inode = data;
123
124         if (bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 return 0;
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134         if (lock->l_resource->lr_lvb_inode &&
135             lock->l_resource->lr_lvb_inode != data) {
136                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137
138                 LASSERTF(old_inode->i_state & I_FREEING,
139                          "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
140                          old_inode, old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state, new_inode, new_inode->i_ino,
142                          new_inode->i_generation);
143         }
144         lock->l_resource->lr_lvb_inode = new_inode;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         return 0;
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161
162         fid_build_reg_res_name(fid, &res_id);
163         /* LU-4405: Clear bits not supported by server */
164         policy->l_inodebits.bits &= exp_connect_ibits(exp);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         return rc;
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         fid_build_reg_res_name(fid, &res_id);
182         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183                                              policy, mode, flags, opaque);
184         return rc;
185 }
186
187 int mdc_null_inode(struct obd_export *exp,
188                    const struct lu_fid *fid)
189 {
190         struct ldlm_res_id res_id;
191         struct ldlm_resource *res;
192         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193
194         LASSERTF(ns != NULL, "no namespace passed\n");
195
196         fid_build_reg_res_name(fid, &res_id);
197
198         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199         if (res == NULL)
200                 return 0;
201
202         lock_res(res);
203         res->lr_lvb_inode = NULL;
204         unlock_res(res);
205
206         ldlm_resource_putref(res);
207         return 0;
208 }
209
210 /* find any ldlm lock of the inode in mdc
211  * return 0    not find
212  *      1    find one
213  *      < 0    error */
214 int mdc_find_cbdata(struct obd_export *exp,
215                     const struct lu_fid *fid,
216                     ldlm_iterator_t it, void *data)
217 {
218         struct ldlm_res_id res_id;
219         int rc = 0;
220
221         fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
222         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223                                    it, data);
224         if (rc == LDLM_ITER_STOP)
225                 return 1;
226         else if (rc == LDLM_ITER_CONTINUE)
227                 return 0;
228         return rc;
229 }
230
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233         /* Don't hold error requests for replay. */
234         if (req->rq_replay) {
235                 spin_lock(&req->rq_lock);
236                 req->rq_replay = 0;
237                 spin_unlock(&req->rq_lock);
238         }
239         if (rc && req->rq_transno != 0) {
240                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241                 LBUG();
242         }
243 }
244
245 /* Save a large LOV EA into the request buffer so that it is available
246  * for replay.  We don't do this in the initial request because the
247  * original request doesn't need this buffer (at most it sends just the
248  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249  * buffer and may also be difficult to allocate and save a very large
250  * request buffer for each open. (bug 5707)
251  *
252  * OOM here may cause recovery failure if lmm is needed (only for the
253  * original open if the MDS crashed just when this client also OOM'd)
254  * but this is incredibly unlikely, and questionable whether the client
255  * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257                                 struct mdt_body *body)
258 {
259         int     rc;
260
261         /* FIXME: remove this explicit offset. */
262         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263                                         body->eadatasize);
264         if (rc) {
265                 CERROR("Can't enlarge segment %d size to %d\n",
266                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
267                 body->valid &= ~OBD_MD_FLEASIZE;
268                 body->eadatasize = 0;
269         }
270 }
271
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273                                                    struct lookup_intent *it,
274                                                    struct md_op_data *op_data,
275                                                    void *lmm, int lmmsize,
276                                                    void *cb_data)
277 {
278         struct ptlrpc_request *req;
279         struct obd_device     *obddev = class_exp2obd(exp);
280         struct ldlm_intent    *lit;
281         LIST_HEAD(cancels);
282         int                 count = 0;
283         int                 mode;
284         int                 rc;
285
286         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288         /* XXX: openlock is not cancelled for cross-refs. */
289         /* If inode is known, cancel conflicting OPEN locks. */
290         if (fid_is_sane(&op_data->op_fid2)) {
291                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292                         if (it->it_flags & FMODE_WRITE)
293                                 mode = LCK_EX;
294                         else
295                                 mode = LCK_PR;
296                 } else {
297                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                                 mode = LCK_CW;
299 #ifdef FMODE_EXEC
300                         else if (it->it_flags & FMODE_EXEC)
301                                 mode = LCK_PR;
302 #endif
303                         else
304                                 mode = LCK_CR;
305                 }
306                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
307                                                 &cancels, mode,
308                                                 MDS_INODELOCK_OPEN);
309         }
310
311         /* If CREATE, cancel parent's UPDATE lock. */
312         if (it->it_op & IT_CREAT)
313                 mode = LCK_EX;
314         else
315                 mode = LCK_CR;
316         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
317                                          &cancels, mode,
318                                          MDS_INODELOCK_UPDATE);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
321                                    &RQF_LDLM_INTENT_OPEN);
322         if (req == NULL) {
323                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
324                 return ERR_PTR(-ENOMEM);
325         }
326
327         /* parent capability */
328         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
329         /* child capability, reserve the size according to parent capa, it will
330          * be filled after we get the reply */
331         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
332
333         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
334                              op_data->op_namelen + 1);
335         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
336                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
337
338         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
339         if (rc < 0) {
340                 ptlrpc_request_free(req);
341                 return ERR_PTR(rc);
342         }
343
344         spin_lock(&req->rq_lock);
345         req->rq_replay = req->rq_import->imp_replayable;
346         spin_unlock(&req->rq_lock);
347
348         /* pack the intent */
349         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
350         lit->opc = (__u64)it->it_op;
351
352         /* pack the intended request */
353         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
354                       lmmsize);
355
356         /* for remote client, fetch remote perm for current user */
357         if (client_is_remote(exp))
358                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
359                                      sizeof(struct mdt_remote_perm));
360         ptlrpc_request_set_replen(req);
361         return req;
362 }
363
364 static struct ptlrpc_request *
365 mdc_intent_getxattr_pack(struct obd_export *exp,
366                          struct lookup_intent *it,
367                          struct md_op_data *op_data)
368 {
369         struct ptlrpc_request   *req;
370         struct ldlm_intent      *lit;
371         int                     rc, count = 0, maxdata;
372         LIST_HEAD(cancels);
373
374
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
377                                         &RQF_LDLM_INTENT_GETXATTR);
378         if (req == NULL)
379                 return ERR_PTR(-ENOMEM);
380
381         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
382
383         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 return ERR_PTR(rc);
387         }
388
389         /* pack the intent */
390         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
391         lit->opc = IT_GETXATTR;
392
393         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
394
395         /* pack the intended request */
396         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
397                         op_data->op_valid, maxdata, -1, 0);
398
399         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
400                                 RCL_SERVER, maxdata);
401
402         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
403                                 RCL_SERVER, maxdata);
404
405         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
406                                 RCL_SERVER, maxdata);
407
408         ptlrpc_request_set_replen(req);
409
410         return req;
411 }
412
413 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
414                                                      struct lookup_intent *it,
415                                                      struct md_op_data *op_data)
416 {
417         struct ptlrpc_request *req;
418         struct obd_device     *obddev = class_exp2obd(exp);
419         struct ldlm_intent    *lit;
420         int                 rc;
421
422         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
423                                    &RQF_LDLM_INTENT_UNLINK);
424         if (req == NULL)
425                 return ERR_PTR(-ENOMEM);
426
427         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
428         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
429                              op_data->op_namelen + 1);
430
431         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 return ERR_PTR(rc);
435         }
436
437         /* pack the intent */
438         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
439         lit->opc = (__u64)it->it_op;
440
441         /* pack the intended request */
442         mdc_unlink_pack(req, op_data);
443
444         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
445                              obddev->u.cli.cl_default_mds_easize);
446         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
447                              obddev->u.cli.cl_default_mds_cookiesize);
448         ptlrpc_request_set_replen(req);
449         return req;
450 }
451
452 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
453                                                     struct lookup_intent *it,
454                                                     struct md_op_data *op_data)
455 {
456         struct ptlrpc_request *req;
457         struct obd_device     *obddev = class_exp2obd(exp);
458         obd_valid             valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
459                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
460                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
461                                        (client_is_remote(exp) ?
462                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
463         struct ldlm_intent    *lit;
464         int                 rc;
465         int                 easize;
466
467         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
468                                    &RQF_LDLM_INTENT_GETATTR);
469         if (req == NULL)
470                 return ERR_PTR(-ENOMEM);
471
472         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
473         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
474                              op_data->op_namelen + 1);
475
476         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
477         if (rc) {
478                 ptlrpc_request_free(req);
479                 return ERR_PTR(rc);
480         }
481
482         /* pack the intent */
483         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
484         lit->opc = (__u64)it->it_op;
485
486         if (obddev->u.cli.cl_default_mds_easize > 0)
487                 easize = obddev->u.cli.cl_default_mds_easize;
488         else
489                 easize = obddev->u.cli.cl_max_mds_easize;
490
491         /* pack the intended request */
492         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
493
494         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
495         if (client_is_remote(exp))
496                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
497                                      sizeof(struct mdt_remote_perm));
498         ptlrpc_request_set_replen(req);
499         return req;
500 }
501
502 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
503                                                      struct lookup_intent *it,
504                                                      struct md_op_data *unused)
505 {
506         struct obd_device     *obd = class_exp2obd(exp);
507         struct ptlrpc_request *req;
508         struct ldlm_intent    *lit;
509         struct layout_intent  *layout;
510         int rc;
511
512         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
513                                 &RQF_LDLM_INTENT_LAYOUT);
514         if (req == NULL)
515                 return ERR_PTR(-ENOMEM);
516
517         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
518         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
519         if (rc) {
520                 ptlrpc_request_free(req);
521                 return ERR_PTR(rc);
522         }
523
524         /* pack the intent */
525         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
526         lit->opc = (__u64)it->it_op;
527
528         /* pack the layout intent request */
529         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
530         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
531          * set for replication */
532         layout->li_opc = LAYOUT_INTENT_ACCESS;
533
534         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
535                              obd->u.cli.cl_default_mds_easize);
536         ptlrpc_request_set_replen(req);
537         return req;
538 }
539
540 static struct ptlrpc_request *
541 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
542 {
543         struct ptlrpc_request *req;
544         int rc;
545
546         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
547         if (req == NULL)
548                 return ERR_PTR(-ENOMEM);
549
550         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
551         if (rc) {
552                 ptlrpc_request_free(req);
553                 return ERR_PTR(rc);
554         }
555
556         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
557         ptlrpc_request_set_replen(req);
558         return req;
559 }
560
561 static int mdc_finish_enqueue(struct obd_export *exp,
562                               struct ptlrpc_request *req,
563                               struct ldlm_enqueue_info *einfo,
564                               struct lookup_intent *it,
565                               struct lustre_handle *lockh,
566                               int rc)
567 {
568         struct req_capsule  *pill = &req->rq_pill;
569         struct ldlm_request *lockreq;
570         struct ldlm_reply   *lockrep;
571         struct lustre_intent_data *intent = &it->d.lustre;
572         struct ldlm_lock    *lock;
573         void            *lvb_data = NULL;
574         int               lvb_len = 0;
575
576         LASSERT(rc >= 0);
577         /* Similarly, if we're going to replay this request, we don't want to
578          * actually get a lock, just perform the intent. */
579         if (req->rq_transno || req->rq_replay) {
580                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
581                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
582         }
583
584         if (rc == ELDLM_LOCK_ABORTED) {
585                 einfo->ei_mode = 0;
586                 memset(lockh, 0, sizeof(*lockh));
587                 rc = 0;
588         } else { /* rc = 0 */
589                 lock = ldlm_handle2lock(lockh);
590                 LASSERT(lock != NULL);
591
592                 /* If the server gave us back a different lock mode, we should
593                  * fix up our variables. */
594                 if (lock->l_req_mode != einfo->ei_mode) {
595                         ldlm_lock_addref(lockh, lock->l_req_mode);
596                         ldlm_lock_decref(lockh, einfo->ei_mode);
597                         einfo->ei_mode = lock->l_req_mode;
598                 }
599                 LDLM_LOCK_PUT(lock);
600         }
601
602         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
603         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
604
605         intent->it_disposition = (int)lockrep->lock_policy_res1;
606         intent->it_status = (int)lockrep->lock_policy_res2;
607         intent->it_lock_mode = einfo->ei_mode;
608         intent->it_lock_handle = lockh->cookie;
609         intent->it_data = req;
610
611         /* Technically speaking rq_transno must already be zero if
612          * it_status is in error, so the check is a bit redundant */
613         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
614                 mdc_clear_replay_flag(req, intent->it_status);
615
616         /* If we're doing an IT_OPEN which did not result in an actual
617          * successful open, then we need to remove the bit which saves
618          * this request for unconditional replay.
619          *
620          * It's important that we do this first!  Otherwise we might exit the
621          * function without doing so, and try to replay a failed create
622          * (bug 3440) */
623         if (it->it_op & IT_OPEN && req->rq_replay &&
624             (!it_disposition(it, DISP_OPEN_OPEN) || intent->it_status != 0))
625                 mdc_clear_replay_flag(req, intent->it_status);
626
627         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
628                   it->it_op, intent->it_disposition, intent->it_status);
629
630         /* We know what to expect, so we do any byte flipping required here */
631         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
632                 struct mdt_body *body;
633
634                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
635                 if (body == NULL) {
636                         CERROR("Can't swab mdt_body\n");
637                         return -EPROTO;
638                 }
639
640                 if (it_disposition(it, DISP_OPEN_OPEN) &&
641                     !it_open_error(DISP_OPEN_OPEN, it)) {
642                         /*
643                          * If this is a successful OPEN request, we need to set
644                          * replay handler and data early, so that if replay
645                          * happens immediately after swabbing below, new reply
646                          * is swabbed by that handler correctly.
647                          */
648                         mdc_set_open_replay_data(NULL, NULL, it);
649                 }
650
651                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
652                         void *eadata;
653
654                         mdc_update_max_ea_from_body(exp, body);
655
656                         /*
657                          * The eadata is opaque; just check that it is there.
658                          * Eventually, obd_unpackmd() will check the contents.
659                          */
660                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
661                                                               body->eadatasize);
662                         if (eadata == NULL)
663                                 return -EPROTO;
664
665                         /* save lvb data and length in case this is for layout
666                          * lock */
667                         lvb_data = eadata;
668                         lvb_len = body->eadatasize;
669
670                         /*
671                          * We save the reply LOV EA in case we have to replay a
672                          * create for recovery.  If we didn't allocate a large
673                          * enough request buffer above we need to reallocate it
674                          * here to hold the actual LOV EA.
675                          *
676                          * To not save LOV EA if request is not going to replay
677                          * (for example error one).
678                          */
679                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
680                                 void *lmm;
681
682                                 if (req_capsule_get_size(pill, &RMF_EADATA,
683                                                          RCL_CLIENT) <
684                                     body->eadatasize)
685                                         mdc_realloc_openmsg(req, body);
686                                 else
687                                         req_capsule_shrink(pill, &RMF_EADATA,
688                                                            body->eadatasize,
689                                                            RCL_CLIENT);
690
691                                 req_capsule_set_size(pill, &RMF_EADATA,
692                                                      RCL_CLIENT,
693                                                      body->eadatasize);
694
695                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
696                                 if (lmm)
697                                         memcpy(lmm, eadata, body->eadatasize);
698                         }
699                 }
700
701                 if (body->valid & OBD_MD_FLRMTPERM) {
702                         struct mdt_remote_perm *perm;
703
704                         LASSERT(client_is_remote(exp));
705                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
706                                                 lustre_swab_mdt_remote_perm);
707                         if (perm == NULL)
708                                 return -EPROTO;
709                 }
710                 if (body->valid & OBD_MD_FLMDSCAPA) {
711                         struct lustre_capa *capa, *p;
712
713                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
714                         if (capa == NULL)
715                                 return -EPROTO;
716
717                         if (it->it_op & IT_OPEN) {
718                                 /* client fid capa will be checked in replay */
719                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
720                                 LASSERT(p);
721                                 *p = *capa;
722                         }
723                 }
724                 if (body->valid & OBD_MD_FLOSSCAPA) {
725                         struct lustre_capa *capa;
726
727                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
728                         if (capa == NULL)
729                                 return -EPROTO;
730                 }
731         } else if (it->it_op & IT_LAYOUT) {
732                 /* maybe the lock was granted right away and layout
733                  * is packed into RMF_DLM_LVB of req */
734                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
735                 if (lvb_len > 0) {
736                         lvb_data = req_capsule_server_sized_get(pill,
737                                                         &RMF_DLM_LVB, lvb_len);
738                         if (lvb_data == NULL)
739                                 return -EPROTO;
740                 }
741         }
742
743         /* fill in stripe data for layout lock */
744         lock = ldlm_handle2lock(lockh);
745         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
746                 void *lmm;
747
748                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
749                         ldlm_it2str(it->it_op), lvb_len);
750
751                 OBD_ALLOC_LARGE(lmm, lvb_len);
752                 if (lmm == NULL) {
753                         LDLM_LOCK_PUT(lock);
754                         return -ENOMEM;
755                 }
756                 memcpy(lmm, lvb_data, lvb_len);
757
758                 /* install lvb_data */
759                 lock_res_and_lock(lock);
760                 if (lock->l_lvb_data == NULL) {
761                         lock->l_lvb_type = LVB_T_LAYOUT;
762                         lock->l_lvb_data = lmm;
763                         lock->l_lvb_len = lvb_len;
764                         lmm = NULL;
765                 }
766                 unlock_res_and_lock(lock);
767                 if (lmm != NULL)
768                         OBD_FREE_LARGE(lmm, lvb_len);
769         }
770         if (lock != NULL)
771                 LDLM_LOCK_PUT(lock);
772
773         return rc;
774 }
775
776 /* We always reserve enough space in the reply packet for a stripe MD, because
777  * we don't know in advance the file type. */
778 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
779                 struct lookup_intent *it, struct md_op_data *op_data,
780                 struct lustre_handle *lockh, void *lmm, int lmmsize,
781                 struct ptlrpc_request **reqp, u64 extra_lock_flags)
782 {
783         static const ldlm_policy_data_t lookup_policy = {
784                 .l_inodebits = { MDS_INODELOCK_LOOKUP }
785         };
786         static const ldlm_policy_data_t update_policy = {
787                 .l_inodebits = { MDS_INODELOCK_UPDATE }
788         };
789         static const ldlm_policy_data_t layout_policy = {
790                 .l_inodebits = { MDS_INODELOCK_LAYOUT }
791         };
792         static const ldlm_policy_data_t getxattr_policy = {
793                 .l_inodebits = { MDS_INODELOCK_XATTR }
794         };
795         ldlm_policy_data_t const *policy = &lookup_policy;
796         struct obd_device *obddev = class_exp2obd(exp);
797         struct ptlrpc_request *req;
798         u64 flags, saved_flags = extra_lock_flags;
799         struct ldlm_res_id res_id;
800         int generation, resends = 0;
801         struct ldlm_reply *lockrep;
802         enum lvb_type lvb_type = LVB_T_NONE;
803         int rc;
804
805         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
806                  einfo->ei_type);
807
808         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
809
810         if (it) {
811                 saved_flags |= LDLM_FL_HAS_INTENT;
812                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
813                         policy = &update_policy;
814                 else if (it->it_op & IT_LAYOUT)
815                         policy = &layout_policy;
816                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
817                         policy = &getxattr_policy;
818         }
819
820         LASSERT(reqp == NULL);
821
822         generation = obddev->u.cli.cl_import->imp_generation;
823 resend:
824         flags = saved_flags;
825         if (!it) {
826                 /* The only way right now is FLOCK, in this case we hide flock
827                    policy as lmm, but lmmsize is 0 */
828                 LASSERT(lmm && lmmsize == 0);
829                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
830                          einfo->ei_type);
831                 policy = (ldlm_policy_data_t *)lmm;
832                 res_id.name[3] = LDLM_FLOCK;
833         } else if (it->it_op & IT_OPEN) {
834                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
835                                            einfo->ei_cbdata);
836                 policy = &update_policy;
837                 einfo->ei_cbdata = NULL;
838                 lmm = NULL;
839         } else if (it->it_op & IT_UNLINK) {
840                 req = mdc_intent_unlink_pack(exp, it, op_data);
841         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
842                 req = mdc_intent_getattr_pack(exp, it, op_data);
843         } else if (it->it_op & IT_READDIR) {
844                 req = mdc_enqueue_pack(exp, 0);
845         } else if (it->it_op & IT_LAYOUT) {
846                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
847                         return -EOPNOTSUPP;
848                 req = mdc_intent_layout_pack(exp, it, op_data);
849                 lvb_type = LVB_T_LAYOUT;
850         } else if (it->it_op & IT_GETXATTR) {
851                 req = mdc_intent_getxattr_pack(exp, it, op_data);
852         } else {
853                 LBUG();
854                 return -EINVAL;
855         }
856
857         if (IS_ERR(req))
858                 return PTR_ERR(req);
859
860         if (req != NULL && it && it->it_op & IT_CREAT)
861                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
862                  * retry logic */
863                 req->rq_no_retry_einprogress = 1;
864
865         if (resends) {
866                 req->rq_generation_set = 1;
867                 req->rq_import_generation = generation;
868                 req->rq_sent = get_seconds() + resends;
869         }
870
871         /* It is important to obtain rpc_lock first (if applicable), so that
872          * threads that are serialised with rpc_lock are not polluting our
873          * rpcs in flight counter. We do not do flock request limiting, though*/
874         if (it) {
875                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
876                 rc = mdc_enter_request(&obddev->u.cli);
877                 if (rc != 0) {
878                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
879                         mdc_clear_replay_flag(req, 0);
880                         ptlrpc_req_finished(req);
881                         return rc;
882                 }
883         }
884
885         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
886                               0, lvb_type, lockh, 0);
887         if (!it) {
888                 /* For flock requests we immediately return without further
889                    delay and let caller deal with the rest, since rest of
890                    this function metadata processing makes no sense for flock
891                    requests anyway. But in case of problem during comms with
892                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
893                    can not rely on caller and this mainly for F_UNLCKs
894                    (explicits or automatically generated by Kernel to clean
895                    current FLocks upon exit) that can't be trashed */
896                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
897                         goto resend;
898                 return rc;
899         }
900
901         mdc_exit_request(&obddev->u.cli);
902         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
903
904         if (rc < 0) {
905                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
906                              "%s: ldlm_cli_enqueue failed: rc = %d\n",
907                              obddev->obd_name, rc);
908
909                 mdc_clear_replay_flag(req, rc);
910                 ptlrpc_req_finished(req);
911                 return rc;
912         }
913
914         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
915         LASSERT(lockrep != NULL);
916
917         lockrep->lock_policy_res2 =
918                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
919
920         /* Retry the create infinitely when we get -EINPROGRESS from
921          * server. This is required by the new quota design. */
922         if (it && it->it_op & IT_CREAT &&
923             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
924                 mdc_clear_replay_flag(req, rc);
925                 ptlrpc_req_finished(req);
926                 resends++;
927
928                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
929                        obddev->obd_name, resends, it->it_op,
930                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
931
932                 if (generation == obddev->u.cli.cl_import->imp_generation) {
933                         goto resend;
934                 } else {
935                         CDEBUG(D_HA, "resend cross eviction\n");
936                         return -EIO;
937                 }
938         }
939
940         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
941         if (rc < 0) {
942                 if (lustre_handle_is_used(lockh)) {
943                         ldlm_lock_decref(lockh, einfo->ei_mode);
944                         memset(lockh, 0, sizeof(*lockh));
945                 }
946                 ptlrpc_req_finished(req);
947         }
948         return rc;
949 }
950
951 static int mdc_finish_intent_lock(struct obd_export *exp,
952                                   struct ptlrpc_request *request,
953                                   struct md_op_data *op_data,
954                                   struct lookup_intent *it,
955                                   struct lustre_handle *lockh)
956 {
957         struct lustre_handle old_lock;
958         struct mdt_body *mdt_body;
959         struct ldlm_lock *lock;
960         int rc;
961
962         LASSERT(request != NULL);
963         LASSERT(request != LP_POISON);
964         LASSERT(request->rq_repmsg != LP_POISON);
965
966         if (!it_disposition(it, DISP_IT_EXECD)) {
967                 /* The server failed before it even started executing the
968                  * intent, i.e. because it couldn't unpack the request. */
969                 LASSERT(it->d.lustre.it_status != 0);
970                 return it->d.lustre.it_status;
971         }
972         rc = it_open_error(DISP_IT_EXECD, it);
973         if (rc)
974                 return rc;
975
976         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
977         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
978
979         /* If we were revalidating a fid/name pair, mark the intent in
980          * case we fail and get called again from lookup */
981         if (fid_is_sane(&op_data->op_fid2) &&
982             it->it_create_mode & M_CHECK_STALE &&
983             it->it_op != IT_GETATTR) {
984
985                 /* Also: did we find the same inode? */
986                 /* sever can return one of two fids:
987                  * op_fid2 - new allocated fid - if file is created.
988                  * op_fid3 - existent fid - if file only open.
989                  * op_fid3 is saved in lmv_intent_open */
990                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
991                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
992                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
993                                "\n", PFID(&op_data->op_fid2),
994                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
995                         return -ESTALE;
996                 }
997         }
998
999         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1000         if (rc)
1001                 return rc;
1002
1003         /* keep requests around for the multiple phases of the call
1004          * this shows the DISP_XX must guarantee we make it into the call
1005          */
1006         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1007             it_disposition(it, DISP_OPEN_CREATE) &&
1008             !it_open_error(DISP_OPEN_CREATE, it)) {
1009                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1010                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1011         }
1012         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1013             it_disposition(it, DISP_OPEN_OPEN) &&
1014             !it_open_error(DISP_OPEN_OPEN, it)) {
1015                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1016                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1017                 /* BUG 11546 - eviction in the middle of open rpc processing */
1018                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1019         }
1020
1021         if (it->it_op & IT_CREAT) {
1022                 /* XXX this belongs in ll_create_it */
1023         } else if (it->it_op == IT_OPEN) {
1024                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1025         } else {
1026                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1027         }
1028
1029         /* If we already have a matching lock, then cancel the new
1030          * one.  We have to set the data here instead of in
1031          * mdc_enqueue, because we need to use the child's inode as
1032          * the l_ast_data to match, and that's not available until
1033          * intent_finish has performed the iget().) */
1034         lock = ldlm_handle2lock(lockh);
1035         if (lock) {
1036                 ldlm_policy_data_t policy = lock->l_policy_data;
1037
1038                 LDLM_DEBUG(lock, "matching against this");
1039
1040                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1041                                          &lock->l_resource->lr_name),
1042                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1043                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1044                 LDLM_LOCK_PUT(lock);
1045
1046                 memcpy(&old_lock, lockh, sizeof(*lockh));
1047                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1048                                     LDLM_IBITS, &policy, LCK_NL,
1049                                     &old_lock, 0)) {
1050                         ldlm_lock_decref_and_cancel(lockh,
1051                                                     it->d.lustre.it_lock_mode);
1052                         memcpy(lockh, &old_lock, sizeof(old_lock));
1053                         it->d.lustre.it_lock_handle = lockh->cookie;
1054                 }
1055         }
1056         CDEBUG(D_DENTRY,
1057                "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1058                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1059                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1060         return rc;
1061 }
1062
1063 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1064                         struct lu_fid *fid, __u64 *bits)
1065 {
1066         /* We could just return 1 immediately, but since we should only
1067          * be called in revalidate_it if we already have a lock, let's
1068          * verify that. */
1069         struct ldlm_res_id res_id;
1070         struct lustre_handle lockh;
1071         ldlm_policy_data_t policy;
1072         ldlm_mode_t mode;
1073
1074         if (it->d.lustre.it_lock_handle) {
1075                 lockh.cookie = it->d.lustre.it_lock_handle;
1076                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1077         } else {
1078                 fid_build_reg_res_name(fid, &res_id);
1079                 switch (it->it_op) {
1080                 case IT_GETATTR:
1081                         /* File attributes are held under multiple bits:
1082                          * nlink is under lookup lock, size and times are
1083                          * under UPDATE lock and recently we've also got
1084                          * a separate permissions lock for owner/group/acl that
1085                          * were protected by lookup lock before.
1086                          * Getattr must provide all of that information,
1087                          * so we need to ensure we have all of those locks.
1088                          * Unfortunately, if the bits are split across multiple
1089                          * locks, there's no easy way to match all of them here,
1090                          * so an extra RPC would be performed to fetch all
1091                          * of those bits at once for now. */
1092                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1093                          * but for old MDTs (< 2.4), permission is covered
1094                          * by LOOKUP lock, so it needs to match all bits here.*/
1095                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1096                                                   MDS_INODELOCK_LOOKUP |
1097                                                   MDS_INODELOCK_PERM;
1098                         break;
1099                 case IT_LAYOUT:
1100                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1101                         break;
1102                 default:
1103                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1104                         break;
1105                 }
1106
1107                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1108                                        LDLM_IBITS, &policy,
1109                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1110                                       &lockh);
1111         }
1112
1113         if (mode) {
1114                 it->d.lustre.it_lock_handle = lockh.cookie;
1115                 it->d.lustre.it_lock_mode = mode;
1116         } else {
1117                 it->d.lustre.it_lock_handle = 0;
1118                 it->d.lustre.it_lock_mode = 0;
1119         }
1120
1121         return !!mode;
1122 }
1123
1124 /*
1125  * This long block is all about fixing up the lock and request state
1126  * so that it is correct as of the moment _before_ the operation was
1127  * applied; that way, the VFS will think that everything is normal and
1128  * call Lustre's regular VFS methods.
1129  *
1130  * If we're performing a creation, that means that unless the creation
1131  * failed with EEXIST, we should fake up a negative dentry.
1132  *
1133  * For everything else, we want to lookup to succeed.
1134  *
1135  * One additional note: if CREATE or OPEN succeeded, we add an extra
1136  * reference to the request because we need to keep it around until
1137  * ll_create/ll_open gets called.
1138  *
1139  * The server will return to us, in it_disposition, an indication of
1140  * exactly what d.lustre.it_status refers to.
1141  *
1142  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1143  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1144  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1145  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1146  * was successful.
1147  *
1148  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1149  * child lookup.
1150  */
1151 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1152                     void *lmm, int lmmsize, struct lookup_intent *it,
1153                     int lookup_flags, struct ptlrpc_request **reqp,
1154                     ldlm_blocking_callback cb_blocking,
1155                     __u64 extra_lock_flags)
1156 {
1157         struct ldlm_enqueue_info einfo = {
1158                 .ei_type        = LDLM_IBITS,
1159                 .ei_mode        = it_to_lock_mode(it),
1160                 .ei_cb_bl       = cb_blocking,
1161                 .ei_cb_cp       = ldlm_completion_ast,
1162         };
1163         struct lustre_handle lockh;
1164         int rc = 0;
1165
1166         LASSERT(it);
1167
1168         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1169                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1170                 op_data->op_name, PFID(&op_data->op_fid2),
1171                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1172                 it->it_flags);
1173
1174         lockh.cookie = 0;
1175         if (fid_is_sane(&op_data->op_fid2) &&
1176             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1177                 /* We could just return 1 immediately, but since we should only
1178                  * be called in revalidate_it if we already have a lock, let's
1179                  * verify that. */
1180                 it->d.lustre.it_lock_handle = 0;
1181                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1182                 /* Only return failure if it was not GETATTR by cfid
1183                    (from inode_revalidate) */
1184                 if (rc || op_data->op_namelen != 0)
1185                         return rc;
1186         }
1187
1188         /* For case if upper layer did not alloc fid, do it now. */
1189         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1190                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1191                 if (rc < 0) {
1192                         CERROR("Can't alloc new fid, rc %d\n", rc);
1193                         return rc;
1194                 }
1195         }
1196         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1197                          extra_lock_flags);
1198         if (rc < 0)
1199                 return rc;
1200
1201         *reqp = it->d.lustre.it_data;
1202         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1203         return rc;
1204 }
1205
1206 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1207                                               struct ptlrpc_request *req,
1208                                               void *args, int rc)
1209 {
1210         struct mdc_getattr_args  *ga = args;
1211         struct obd_export       *exp = ga->ga_exp;
1212         struct md_enqueue_info   *minfo = ga->ga_minfo;
1213         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1214         struct lookup_intent     *it;
1215         struct lustre_handle     *lockh;
1216         struct obd_device       *obddev;
1217         struct ldlm_reply        *lockrep;
1218         __u64                flags = LDLM_FL_HAS_INTENT;
1219
1220         it    = &minfo->mi_it;
1221         lockh = &minfo->mi_lockh;
1222
1223         obddev = class_exp2obd(exp);
1224
1225         mdc_exit_request(&obddev->u.cli);
1226         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1227                 rc = -ETIMEDOUT;
1228
1229         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1230                                    &flags, NULL, 0, lockh, rc);
1231         if (rc < 0) {
1232                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1233                 mdc_clear_replay_flag(req, rc);
1234                 GOTO(out, rc);
1235         }
1236
1237         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1238         LASSERT(lockrep != NULL);
1239
1240         lockrep->lock_policy_res2 =
1241                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1242
1243         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1244         if (rc)
1245                 GOTO(out, rc);
1246
1247         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1248
1249 out:
1250         OBD_FREE_PTR(einfo);
1251         minfo->mi_cb(req, minfo, rc);
1252         return 0;
1253 }
1254
1255 int mdc_intent_getattr_async(struct obd_export *exp,
1256                              struct md_enqueue_info *minfo,
1257                              struct ldlm_enqueue_info *einfo)
1258 {
1259         struct md_op_data       *op_data = &minfo->mi_data;
1260         struct lookup_intent    *it = &minfo->mi_it;
1261         struct ptlrpc_request   *req;
1262         struct mdc_getattr_args *ga;
1263         struct obd_device       *obddev = class_exp2obd(exp);
1264         struct ldlm_res_id       res_id;
1265         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1266          *     for statahead currently. Consider CMD in future, such two bits
1267          *     maybe managed by different MDS, should be adjusted then. */
1268         ldlm_policy_data_t       policy = {
1269                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1270                                                          MDS_INODELOCK_UPDATE }
1271                                  };
1272         int                   rc = 0;
1273         __u64               flags = LDLM_FL_HAS_INTENT;
1274
1275         CDEBUG(D_DLMTRACE,
1276                 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1277                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1278                 ldlm_it2str(it->it_op), it->it_flags);
1279
1280         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1281         req = mdc_intent_getattr_pack(exp, it, op_data);
1282         if (IS_ERR(req))
1283                 return PTR_ERR(req);
1284
1285         rc = mdc_enter_request(&obddev->u.cli);
1286         if (rc != 0) {
1287                 ptlrpc_req_finished(req);
1288                 return rc;
1289         }
1290
1291         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1292                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1293         if (rc < 0) {
1294                 mdc_exit_request(&obddev->u.cli);
1295                 ptlrpc_req_finished(req);
1296                 return rc;
1297         }
1298
1299         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1300         ga = ptlrpc_req_async_args(req);
1301         ga->ga_exp = exp;
1302         ga->ga_minfo = minfo;
1303         ga->ga_einfo = einfo;
1304
1305         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1306         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1307
1308         return 0;
1309 }