4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 # include <linux/module.h>
41 #include "../include/linux/lustre_intent.h"
42 #include "../include/obd.h"
43 #include "../include/obd_class.h"
44 #include "../include/lustre_dlm.h"
45 #include "../include/lustre_fid.h" /* fid_res_name_eq() */
46 #include "../include/lustre_mdc.h"
47 #include "../include/lustre_net.h"
48 #include "../include/lustre_req_layout.h"
49 #include "mdc_internal.h"
51 struct mdc_getattr_args {
52 struct obd_export *ga_exp;
53 struct md_enqueue_info *ga_minfo;
54 struct ldlm_enqueue_info *ga_einfo;
57 int it_disposition(struct lookup_intent *it, int flag)
59 return it->d.lustre.it_disposition & flag;
61 EXPORT_SYMBOL(it_disposition);
63 void it_set_disposition(struct lookup_intent *it, int flag)
65 it->d.lustre.it_disposition |= flag;
67 EXPORT_SYMBOL(it_set_disposition);
69 void it_clear_disposition(struct lookup_intent *it, int flag)
71 it->d.lustre.it_disposition &= ~flag;
73 EXPORT_SYMBOL(it_clear_disposition);
75 int it_open_error(int phase, struct lookup_intent *it)
77 if (it_disposition(it, DISP_OPEN_LEASE)) {
78 if (phase >= DISP_OPEN_LEASE)
79 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_OPEN_OPEN)) {
84 if (phase >= DISP_OPEN_OPEN)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_OPEN_CREATE)) {
91 if (phase >= DISP_OPEN_CREATE)
92 return it->d.lustre.it_status;
97 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 if (phase >= DISP_LOOKUP_EXECD)
99 return it->d.lustre.it_status;
104 if (it_disposition(it, DISP_IT_EXECD)) {
105 if (phase >= DISP_IT_EXECD)
106 return it->d.lustre.it_status;
110 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 it->d.lustre.it_status);
115 EXPORT_SYMBOL(it_open_error);
117 /* this must be called on a lockh that is known to have a referenced lock */
118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
121 struct ldlm_lock *lock;
122 struct inode *new_inode = data;
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
134 if (lock->l_resource->lr_lvb_inode &&
135 lock->l_resource->lr_lvb_inode != data) {
136 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
144 lock->l_resource->lr_lvb_inode = new_inode;
146 *bits = lock->l_policy_data.l_inodebits.bits;
148 unlock_res_and_lock(lock);
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
159 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 /* LU-4405: Clear bits not supported by server */
164 policy->l_inodebits.bits &= exp_connect_ibits(exp);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
181 fid_build_reg_res_name(fid, &res_id);
182 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183 policy, mode, flags, opaque);
187 int mdc_null_inode(struct obd_export *exp,
188 const struct lu_fid *fid)
190 struct ldlm_res_id res_id;
191 struct ldlm_resource *res;
192 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
194 LASSERTF(ns != NULL, "no namespace passed\n");
196 fid_build_reg_res_name(fid, &res_id);
198 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
203 res->lr_lvb_inode = NULL;
206 ldlm_resource_putref(res);
210 /* find any ldlm lock of the inode in mdc
214 int mdc_find_cbdata(struct obd_export *exp,
215 const struct lu_fid *fid,
216 ldlm_iterator_t it, void *data)
218 struct ldlm_res_id res_id;
221 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
224 if (rc == LDLM_ITER_STOP)
226 else if (rc == LDLM_ITER_CONTINUE)
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 spin_lock(&req->rq_lock);
237 spin_unlock(&req->rq_lock);
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
245 /* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
286 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
288 /* XXX: openlock is not cancelled for cross-refs. */
289 /* If inode is known, cancel conflicting OPEN locks. */
290 if (fid_is_sane(&op_data->op_fid2)) {
291 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292 if (it->it_flags & FMODE_WRITE)
297 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
300 else if (it->it_flags & FMODE_EXEC)
306 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
311 /* If CREATE, cancel parent's UPDATE lock. */
312 if (it->it_op & IT_CREAT)
316 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
318 MDS_INODELOCK_UPDATE);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
321 &RQF_LDLM_INTENT_OPEN);
323 ldlm_lock_list_put(&cancels, l_bl_ast, count);
324 return ERR_PTR(-ENOMEM);
327 /* parent capability */
328 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
329 /* child capability, reserve the size according to parent capa, it will
330 * be filled after we get the reply */
331 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
333 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
334 op_data->op_namelen + 1);
335 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
336 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
338 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
340 ptlrpc_request_free(req);
344 spin_lock(&req->rq_lock);
345 req->rq_replay = req->rq_import->imp_replayable;
346 spin_unlock(&req->rq_lock);
348 /* pack the intent */
349 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
350 lit->opc = (__u64)it->it_op;
352 /* pack the intended request */
353 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
356 /* for remote client, fetch remote perm for current user */
357 if (client_is_remote(exp))
358 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
359 sizeof(struct mdt_remote_perm));
360 ptlrpc_request_set_replen(req);
364 static struct ptlrpc_request *
365 mdc_intent_getxattr_pack(struct obd_export *exp,
366 struct lookup_intent *it,
367 struct md_op_data *op_data)
369 struct ptlrpc_request *req;
370 struct ldlm_intent *lit;
371 int rc, count = 0, maxdata;
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
377 &RQF_LDLM_INTENT_GETXATTR);
379 return ERR_PTR(-ENOMEM);
381 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
383 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
385 ptlrpc_request_free(req);
389 /* pack the intent */
390 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
391 lit->opc = IT_GETXATTR;
393 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
395 /* pack the intended request */
396 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
397 op_data->op_valid, maxdata, -1, 0);
399 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
400 RCL_SERVER, maxdata);
402 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
403 RCL_SERVER, maxdata);
405 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
406 RCL_SERVER, maxdata);
408 ptlrpc_request_set_replen(req);
413 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
414 struct lookup_intent *it,
415 struct md_op_data *op_data)
417 struct ptlrpc_request *req;
418 struct obd_device *obddev = class_exp2obd(exp);
419 struct ldlm_intent *lit;
422 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
423 &RQF_LDLM_INTENT_UNLINK);
425 return ERR_PTR(-ENOMEM);
427 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
428 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
429 op_data->op_namelen + 1);
431 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
433 ptlrpc_request_free(req);
437 /* pack the intent */
438 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
439 lit->opc = (__u64)it->it_op;
441 /* pack the intended request */
442 mdc_unlink_pack(req, op_data);
444 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
445 obddev->u.cli.cl_default_mds_easize);
446 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
447 obddev->u.cli.cl_default_mds_cookiesize);
448 ptlrpc_request_set_replen(req);
452 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
453 struct lookup_intent *it,
454 struct md_op_data *op_data)
456 struct ptlrpc_request *req;
457 struct obd_device *obddev = class_exp2obd(exp);
458 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
459 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
460 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
461 (client_is_remote(exp) ?
462 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
463 struct ldlm_intent *lit;
467 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
468 &RQF_LDLM_INTENT_GETATTR);
470 return ERR_PTR(-ENOMEM);
472 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
473 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
474 op_data->op_namelen + 1);
476 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
478 ptlrpc_request_free(req);
482 /* pack the intent */
483 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
484 lit->opc = (__u64)it->it_op;
486 if (obddev->u.cli.cl_default_mds_easize > 0)
487 easize = obddev->u.cli.cl_default_mds_easize;
489 easize = obddev->u.cli.cl_max_mds_easize;
491 /* pack the intended request */
492 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
494 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
495 if (client_is_remote(exp))
496 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
497 sizeof(struct mdt_remote_perm));
498 ptlrpc_request_set_replen(req);
502 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
503 struct lookup_intent *it,
504 struct md_op_data *unused)
506 struct obd_device *obd = class_exp2obd(exp);
507 struct ptlrpc_request *req;
508 struct ldlm_intent *lit;
509 struct layout_intent *layout;
512 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
513 &RQF_LDLM_INTENT_LAYOUT);
515 return ERR_PTR(-ENOMEM);
517 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
518 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
520 ptlrpc_request_free(req);
524 /* pack the intent */
525 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
526 lit->opc = (__u64)it->it_op;
528 /* pack the layout intent request */
529 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
530 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
531 * set for replication */
532 layout->li_opc = LAYOUT_INTENT_ACCESS;
534 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
535 obd->u.cli.cl_default_mds_easize);
536 ptlrpc_request_set_replen(req);
540 static struct ptlrpc_request *
541 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
543 struct ptlrpc_request *req;
546 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
548 return ERR_PTR(-ENOMEM);
550 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
552 ptlrpc_request_free(req);
556 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
557 ptlrpc_request_set_replen(req);
561 static int mdc_finish_enqueue(struct obd_export *exp,
562 struct ptlrpc_request *req,
563 struct ldlm_enqueue_info *einfo,
564 struct lookup_intent *it,
565 struct lustre_handle *lockh,
568 struct req_capsule *pill = &req->rq_pill;
569 struct ldlm_request *lockreq;
570 struct ldlm_reply *lockrep;
571 struct lustre_intent_data *intent = &it->d.lustre;
572 struct ldlm_lock *lock;
573 void *lvb_data = NULL;
577 /* Similarly, if we're going to replay this request, we don't want to
578 * actually get a lock, just perform the intent. */
579 if (req->rq_transno || req->rq_replay) {
580 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
581 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
584 if (rc == ELDLM_LOCK_ABORTED) {
586 memset(lockh, 0, sizeof(*lockh));
588 } else { /* rc = 0 */
589 lock = ldlm_handle2lock(lockh);
590 LASSERT(lock != NULL);
592 /* If the server gave us back a different lock mode, we should
593 * fix up our variables. */
594 if (lock->l_req_mode != einfo->ei_mode) {
595 ldlm_lock_addref(lockh, lock->l_req_mode);
596 ldlm_lock_decref(lockh, einfo->ei_mode);
597 einfo->ei_mode = lock->l_req_mode;
602 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
603 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
605 intent->it_disposition = (int)lockrep->lock_policy_res1;
606 intent->it_status = (int)lockrep->lock_policy_res2;
607 intent->it_lock_mode = einfo->ei_mode;
608 intent->it_lock_handle = lockh->cookie;
609 intent->it_data = req;
611 /* Technically speaking rq_transno must already be zero if
612 * it_status is in error, so the check is a bit redundant */
613 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
614 mdc_clear_replay_flag(req, intent->it_status);
616 /* If we're doing an IT_OPEN which did not result in an actual
617 * successful open, then we need to remove the bit which saves
618 * this request for unconditional replay.
620 * It's important that we do this first! Otherwise we might exit the
621 * function without doing so, and try to replay a failed create
623 if (it->it_op & IT_OPEN && req->rq_replay &&
624 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
625 mdc_clear_replay_flag(req, intent->it_status);
627 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
628 it->it_op, intent->it_disposition, intent->it_status);
630 /* We know what to expect, so we do any byte flipping required here */
631 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
632 struct mdt_body *body;
634 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
636 CERROR ("Can't swab mdt_body\n");
640 if (it_disposition(it, DISP_OPEN_OPEN) &&
641 !it_open_error(DISP_OPEN_OPEN, it)) {
643 * If this is a successful OPEN request, we need to set
644 * replay handler and data early, so that if replay
645 * happens immediately after swabbing below, new reply
646 * is swabbed by that handler correctly.
648 mdc_set_open_replay_data(NULL, NULL, it);
651 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
654 mdc_update_max_ea_from_body(exp, body);
657 * The eadata is opaque; just check that it is there.
658 * Eventually, obd_unpackmd() will check the contents.
660 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
665 /* save lvb data and length in case this is for layout
668 lvb_len = body->eadatasize;
671 * We save the reply LOV EA in case we have to replay a
672 * create for recovery. If we didn't allocate a large
673 * enough request buffer above we need to reallocate it
674 * here to hold the actual LOV EA.
676 * To not save LOV EA if request is not going to replay
677 * (for example error one).
679 if ((it->it_op & IT_OPEN) && req->rq_replay) {
681 if (req_capsule_get_size(pill, &RMF_EADATA,
684 mdc_realloc_openmsg(req, body);
686 req_capsule_shrink(pill, &RMF_EADATA,
690 req_capsule_set_size(pill, &RMF_EADATA,
694 lmm = req_capsule_client_get(pill, &RMF_EADATA);
696 memcpy(lmm, eadata, body->eadatasize);
700 if (body->valid & OBD_MD_FLRMTPERM) {
701 struct mdt_remote_perm *perm;
703 LASSERT(client_is_remote(exp));
704 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
705 lustre_swab_mdt_remote_perm);
709 if (body->valid & OBD_MD_FLMDSCAPA) {
710 struct lustre_capa *capa, *p;
712 capa = req_capsule_server_get(pill, &RMF_CAPA1);
716 if (it->it_op & IT_OPEN) {
717 /* client fid capa will be checked in replay */
718 p = req_capsule_client_get(pill, &RMF_CAPA2);
723 if (body->valid & OBD_MD_FLOSSCAPA) {
724 struct lustre_capa *capa;
726 capa = req_capsule_server_get(pill, &RMF_CAPA2);
730 } else if (it->it_op & IT_LAYOUT) {
731 /* maybe the lock was granted right away and layout
732 * is packed into RMF_DLM_LVB of req */
733 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
735 lvb_data = req_capsule_server_sized_get(pill,
736 &RMF_DLM_LVB, lvb_len);
737 if (lvb_data == NULL)
742 /* fill in stripe data for layout lock */
743 lock = ldlm_handle2lock(lockh);
744 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
747 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
748 ldlm_it2str(it->it_op), lvb_len);
750 OBD_ALLOC_LARGE(lmm, lvb_len);
755 memcpy(lmm, lvb_data, lvb_len);
757 /* install lvb_data */
758 lock_res_and_lock(lock);
759 if (lock->l_lvb_data == NULL) {
760 lock->l_lvb_type = LVB_T_LAYOUT;
761 lock->l_lvb_data = lmm;
762 lock->l_lvb_len = lvb_len;
765 unlock_res_and_lock(lock);
767 OBD_FREE_LARGE(lmm, lvb_len);
775 /* We always reserve enough space in the reply packet for a stripe MD, because
776 * we don't know in advance the file type. */
777 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
778 struct lookup_intent *it, struct md_op_data *op_data,
779 struct lustre_handle *lockh, void *lmm, int lmmsize,
780 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
782 struct obd_device *obddev = class_exp2obd(exp);
783 struct ptlrpc_request *req = NULL;
784 __u64 flags, saved_flags = extra_lock_flags;
786 struct ldlm_res_id res_id;
787 static const ldlm_policy_data_t lookup_policy =
788 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
789 static const ldlm_policy_data_t update_policy =
790 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
791 static const ldlm_policy_data_t layout_policy =
792 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
793 static const ldlm_policy_data_t getxattr_policy = {
794 .l_inodebits = { MDS_INODELOCK_XATTR } };
795 ldlm_policy_data_t const *policy = &lookup_policy;
796 int generation, resends = 0;
797 struct ldlm_reply *lockrep;
798 enum lvb_type lvb_type = 0;
800 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
803 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
806 saved_flags |= LDLM_FL_HAS_INTENT;
807 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
808 policy = &update_policy;
809 else if (it->it_op & IT_LAYOUT)
810 policy = &layout_policy;
811 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
812 policy = &getxattr_policy;
815 LASSERT(reqp == NULL);
817 generation = obddev->u.cli.cl_import->imp_generation;
821 /* The only way right now is FLOCK, in this case we hide flock
822 policy as lmm, but lmmsize is 0 */
823 LASSERT(lmm && lmmsize == 0);
824 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
826 policy = (ldlm_policy_data_t *)lmm;
827 res_id.name[3] = LDLM_FLOCK;
828 } else if (it->it_op & IT_OPEN) {
829 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
831 policy = &update_policy;
832 einfo->ei_cbdata = NULL;
834 } else if (it->it_op & IT_UNLINK) {
835 req = mdc_intent_unlink_pack(exp, it, op_data);
836 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
837 req = mdc_intent_getattr_pack(exp, it, op_data);
838 } else if (it->it_op & IT_READDIR) {
839 req = mdc_enqueue_pack(exp, 0);
840 } else if (it->it_op & IT_LAYOUT) {
841 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
843 req = mdc_intent_layout_pack(exp, it, op_data);
844 lvb_type = LVB_T_LAYOUT;
845 } else if (it->it_op & IT_GETXATTR) {
846 req = mdc_intent_getxattr_pack(exp, it, op_data);
855 if (req != NULL && it && it->it_op & IT_CREAT)
856 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
858 req->rq_no_retry_einprogress = 1;
861 req->rq_generation_set = 1;
862 req->rq_import_generation = generation;
863 req->rq_sent = get_seconds() + resends;
866 /* It is important to obtain rpc_lock first (if applicable), so that
867 * threads that are serialised with rpc_lock are not polluting our
868 * rpcs in flight counter. We do not do flock request limiting, though*/
870 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
871 rc = mdc_enter_request(&obddev->u.cli);
873 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
874 mdc_clear_replay_flag(req, 0);
875 ptlrpc_req_finished(req);
880 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
881 0, lvb_type, lockh, 0);
883 /* For flock requests we immediately return without further
884 delay and let caller deal with the rest, since rest of
885 this function metadata processing makes no sense for flock
886 requests anyway. But in case of problem during comms with
887 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
888 can not rely on caller and this mainly for F_UNLCKs
889 (explicits or automatically generated by Kernel to clean
890 current FLocks upon exit) that can't be trashed */
891 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
896 mdc_exit_request(&obddev->u.cli);
897 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
900 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
901 "%s: ldlm_cli_enqueue failed: rc = %d\n",
902 obddev->obd_name, rc);
904 mdc_clear_replay_flag(req, rc);
905 ptlrpc_req_finished(req);
909 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
910 LASSERT(lockrep != NULL);
912 lockrep->lock_policy_res2 =
913 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
915 /* Retry the create infinitely when we get -EINPROGRESS from
916 * server. This is required by the new quota design. */
917 if (it && it->it_op & IT_CREAT &&
918 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
919 mdc_clear_replay_flag(req, rc);
920 ptlrpc_req_finished(req);
923 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
924 obddev->obd_name, resends, it->it_op,
925 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
927 if (generation == obddev->u.cli.cl_import->imp_generation) {
930 CDEBUG(D_HA, "resend cross eviction\n");
935 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
937 if (lustre_handle_is_used(lockh)) {
938 ldlm_lock_decref(lockh, einfo->ei_mode);
939 memset(lockh, 0, sizeof(*lockh));
941 ptlrpc_req_finished(req);
946 static int mdc_finish_intent_lock(struct obd_export *exp,
947 struct ptlrpc_request *request,
948 struct md_op_data *op_data,
949 struct lookup_intent *it,
950 struct lustre_handle *lockh)
952 struct lustre_handle old_lock;
953 struct mdt_body *mdt_body;
954 struct ldlm_lock *lock;
957 LASSERT(request != NULL);
958 LASSERT(request != LP_POISON);
959 LASSERT(request->rq_repmsg != LP_POISON);
961 if (!it_disposition(it, DISP_IT_EXECD)) {
962 /* The server failed before it even started executing the
963 * intent, i.e. because it couldn't unpack the request. */
964 LASSERT(it->d.lustre.it_status != 0);
965 return it->d.lustre.it_status;
967 rc = it_open_error(DISP_IT_EXECD, it);
971 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
972 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
974 /* If we were revalidating a fid/name pair, mark the intent in
975 * case we fail and get called again from lookup */
976 if (fid_is_sane(&op_data->op_fid2) &&
977 it->it_create_mode & M_CHECK_STALE &&
978 it->it_op != IT_GETATTR) {
980 /* Also: did we find the same inode? */
981 /* sever can return one of two fids:
982 * op_fid2 - new allocated fid - if file is created.
983 * op_fid3 - existent fid - if file only open.
984 * op_fid3 is saved in lmv_intent_open */
985 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
986 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
987 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
988 "\n", PFID(&op_data->op_fid2),
989 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
994 rc = it_open_error(DISP_LOOKUP_EXECD, it);
998 /* keep requests around for the multiple phases of the call
999 * this shows the DISP_XX must guarantee we make it into the call
1001 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1002 it_disposition(it, DISP_OPEN_CREATE) &&
1003 !it_open_error(DISP_OPEN_CREATE, it)) {
1004 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1005 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1007 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1008 it_disposition(it, DISP_OPEN_OPEN) &&
1009 !it_open_error(DISP_OPEN_OPEN, it)) {
1010 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1011 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1012 /* BUG 11546 - eviction in the middle of open rpc processing */
1013 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1016 if (it->it_op & IT_CREAT) {
1017 /* XXX this belongs in ll_create_it */
1018 } else if (it->it_op == IT_OPEN) {
1019 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1021 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1024 /* If we already have a matching lock, then cancel the new
1025 * one. We have to set the data here instead of in
1026 * mdc_enqueue, because we need to use the child's inode as
1027 * the l_ast_data to match, and that's not available until
1028 * intent_finish has performed the iget().) */
1029 lock = ldlm_handle2lock(lockh);
1031 ldlm_policy_data_t policy = lock->l_policy_data;
1032 LDLM_DEBUG(lock, "matching against this");
1034 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1035 &lock->l_resource->lr_name),
1036 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1037 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1038 LDLM_LOCK_PUT(lock);
1040 memcpy(&old_lock, lockh, sizeof(*lockh));
1041 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1042 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1043 ldlm_lock_decref_and_cancel(lockh,
1044 it->d.lustre.it_lock_mode);
1045 memcpy(lockh, &old_lock, sizeof(old_lock));
1046 it->d.lustre.it_lock_handle = lockh->cookie;
1049 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1050 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1051 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1055 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1056 struct lu_fid *fid, __u64 *bits)
1058 /* We could just return 1 immediately, but since we should only
1059 * be called in revalidate_it if we already have a lock, let's
1061 struct ldlm_res_id res_id;
1062 struct lustre_handle lockh;
1063 ldlm_policy_data_t policy;
1066 if (it->d.lustre.it_lock_handle) {
1067 lockh.cookie = it->d.lustre.it_lock_handle;
1068 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1070 fid_build_reg_res_name(fid, &res_id);
1071 switch (it->it_op) {
1073 /* File attributes are held under multiple bits:
1074 * nlink is under lookup lock, size and times are
1075 * under UPDATE lock and recently we've also got
1076 * a separate permissions lock for owner/group/acl that
1077 * were protected by lookup lock before.
1078 * Getattr must provide all of that information,
1079 * so we need to ensure we have all of those locks.
1080 * Unfortunately, if the bits are split across multiple
1081 * locks, there's no easy way to match all of them here,
1082 * so an extra RPC would be performed to fetch all
1083 * of those bits at once for now. */
1084 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1085 * but for old MDTs (< 2.4), permission is covered
1086 * by LOOKUP lock, so it needs to match all bits here.*/
1087 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1088 MDS_INODELOCK_LOOKUP |
1092 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1095 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1099 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1100 LDLM_IBITS, &policy,
1101 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1106 it->d.lustre.it_lock_handle = lockh.cookie;
1107 it->d.lustre.it_lock_mode = mode;
1109 it->d.lustre.it_lock_handle = 0;
1110 it->d.lustre.it_lock_mode = 0;
1117 * This long block is all about fixing up the lock and request state
1118 * so that it is correct as of the moment _before_ the operation was
1119 * applied; that way, the VFS will think that everything is normal and
1120 * call Lustre's regular VFS methods.
1122 * If we're performing a creation, that means that unless the creation
1123 * failed with EEXIST, we should fake up a negative dentry.
1125 * For everything else, we want to lookup to succeed.
1127 * One additional note: if CREATE or OPEN succeeded, we add an extra
1128 * reference to the request because we need to keep it around until
1129 * ll_create/ll_open gets called.
1131 * The server will return to us, in it_disposition, an indication of
1132 * exactly what d.lustre.it_status refers to.
1134 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1135 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1136 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1137 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1140 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1143 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1144 void *lmm, int lmmsize, struct lookup_intent *it,
1145 int lookup_flags, struct ptlrpc_request **reqp,
1146 ldlm_blocking_callback cb_blocking,
1147 __u64 extra_lock_flags)
1149 struct ldlm_enqueue_info einfo = {
1150 .ei_type = LDLM_IBITS,
1151 .ei_mode = it_to_lock_mode(it),
1152 .ei_cb_bl = cb_blocking,
1153 .ei_cb_cp = ldlm_completion_ast,
1155 struct lustre_handle lockh;
1160 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1161 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1162 op_data->op_name, PFID(&op_data->op_fid2),
1163 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1167 if (fid_is_sane(&op_data->op_fid2) &&
1168 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1169 /* We could just return 1 immediately, but since we should only
1170 * be called in revalidate_it if we already have a lock, let's
1172 it->d.lustre.it_lock_handle = 0;
1173 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1174 /* Only return failure if it was not GETATTR by cfid
1175 (from inode_revalidate) */
1176 if (rc || op_data->op_namelen != 0)
1180 /* For case if upper layer did not alloc fid, do it now. */
1181 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1182 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1184 CERROR("Can't alloc new fid, rc %d\n", rc);
1188 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1193 *reqp = it->d.lustre.it_data;
1194 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1198 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1199 struct ptlrpc_request *req,
1202 struct mdc_getattr_args *ga = args;
1203 struct obd_export *exp = ga->ga_exp;
1204 struct md_enqueue_info *minfo = ga->ga_minfo;
1205 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1206 struct lookup_intent *it;
1207 struct lustre_handle *lockh;
1208 struct obd_device *obddev;
1209 struct ldlm_reply *lockrep;
1210 __u64 flags = LDLM_FL_HAS_INTENT;
1213 lockh = &minfo->mi_lockh;
1215 obddev = class_exp2obd(exp);
1217 mdc_exit_request(&obddev->u.cli);
1218 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1221 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1222 &flags, NULL, 0, lockh, rc);
1224 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1225 mdc_clear_replay_flag(req, rc);
1229 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1230 LASSERT(lockrep != NULL);
1232 lockrep->lock_policy_res2 =
1233 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1235 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1239 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1242 OBD_FREE_PTR(einfo);
1243 minfo->mi_cb(req, minfo, rc);
1247 int mdc_intent_getattr_async(struct obd_export *exp,
1248 struct md_enqueue_info *minfo,
1249 struct ldlm_enqueue_info *einfo)
1251 struct md_op_data *op_data = &minfo->mi_data;
1252 struct lookup_intent *it = &minfo->mi_it;
1253 struct ptlrpc_request *req;
1254 struct mdc_getattr_args *ga;
1255 struct obd_device *obddev = class_exp2obd(exp);
1256 struct ldlm_res_id res_id;
1257 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1258 * for statahead currently. Consider CMD in future, such two bits
1259 * maybe managed by different MDS, should be adjusted then. */
1260 ldlm_policy_data_t policy = {
1261 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1262 MDS_INODELOCK_UPDATE }
1265 __u64 flags = LDLM_FL_HAS_INTENT;
1268 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1269 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1270 ldlm_it2str(it->it_op), it->it_flags);
1272 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1273 req = mdc_intent_getattr_pack(exp, it, op_data);
1275 return PTR_ERR(req);
1277 rc = mdc_enter_request(&obddev->u.cli);
1279 ptlrpc_req_finished(req);
1283 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1284 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1286 mdc_exit_request(&obddev->u.cli);
1287 ptlrpc_req_finished(req);
1291 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1292 ga = ptlrpc_req_async_args(req);
1294 ga->ga_minfo = minfo;
1295 ga->ga_einfo = einfo;
1297 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1298 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);