1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 struct mdc_getattr_args {
60 struct obd_export *ga_exp;
61 struct md_enqueue_info *ga_minfo;
62 struct ldlm_enqueue_info *ga_einfo;
65 int it_disposition(struct lookup_intent *it, int flag)
67 return it->d.lustre.it_disposition & flag;
69 EXPORT_SYMBOL(it_disposition);
71 void it_set_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition |= flag;
75 EXPORT_SYMBOL(it_set_disposition);
77 void it_clear_disposition(struct lookup_intent *it, int flag)
79 it->d.lustre.it_disposition &= ~flag;
81 EXPORT_SYMBOL(it_clear_disposition);
83 int it_open_error(int phase, struct lookup_intent *it)
85 if (it_disposition(it, DISP_OPEN_OPEN)) {
86 if (phase >= DISP_OPEN_OPEN)
87 return it->d.lustre.it_status;
92 if (it_disposition(it, DISP_OPEN_CREATE)) {
93 if (phase >= DISP_OPEN_CREATE)
94 return it->d.lustre.it_status;
99 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
100 if (phase >= DISP_LOOKUP_EXECD)
101 return it->d.lustre.it_status;
106 if (it_disposition(it, DISP_IT_EXECD)) {
107 if (phase >= DISP_IT_EXECD)
108 return it->d.lustre.it_status;
112 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
113 it->d.lustre.it_status);
117 EXPORT_SYMBOL(it_open_error);
119 /* this must be called on a lockh that is known to have a referenced lock */
120 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
123 struct ldlm_lock *lock;
134 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
136 LASSERT(lock != NULL);
137 lock_res_and_lock(lock);
139 if (lock->l_ast_data && lock->l_ast_data != data) {
140 struct inode *new_inode = data;
141 struct inode *old_inode = lock->l_ast_data;
142 LASSERTF(old_inode->i_state & I_FREEING,
143 "Found existing inode %p/%lu/%u state %lu in lock: "
144 "setting data to %p/%lu/%u\n", old_inode,
145 old_inode->i_ino, old_inode->i_generation,
147 new_inode, new_inode->i_ino, new_inode->i_generation);
150 lock->l_ast_data = data;
152 *bits = lock->l_policy_data.l_inodebits.bits;
154 unlock_res_and_lock(lock);
160 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
161 const struct lu_fid *fid, ldlm_type_t type,
162 ldlm_policy_data_t *policy, ldlm_mode_t mode,
163 struct lustre_handle *lockh)
165 struct ldlm_res_id res_id;
169 fid_build_reg_res_name(fid, &res_id);
170 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
171 &res_id, type, policy, mode, lockh, 0);
175 int mdc_cancel_unused(struct obd_export *exp,
176 const struct lu_fid *fid,
177 ldlm_policy_data_t *policy,
179 ldlm_cancel_flags_t flags,
182 struct ldlm_res_id res_id;
183 struct obd_device *obd = class_exp2obd(exp);
188 fid_build_reg_res_name(fid, &res_id);
189 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
190 policy, mode, flags, opaque);
194 int mdc_change_cbdata(struct obd_export *exp,
195 const struct lu_fid *fid,
196 ldlm_iterator_t it, void *data)
198 struct ldlm_res_id res_id;
201 fid_build_reg_res_name(fid, &res_id);
202 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
209 /* find any ldlm lock of the inode in mdc
213 int mdc_find_cbdata(struct obd_export *exp,
214 const struct lu_fid *fid,
215 ldlm_iterator_t it, void *data)
217 struct ldlm_res_id res_id;
221 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
224 if (rc == LDLM_ITER_STOP)
226 else if (rc == LDLM_ITER_CONTINUE)
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 cfs_spin_lock(&req->rq_lock);
237 cfs_spin_unlock(&req->rq_lock);
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
245 /* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
281 CFS_LIST_HEAD(cancels);
287 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
289 /* XXX: openlock is not cancelled for cross-refs. */
290 /* If inode is known, cancel conflicting OPEN locks. */
291 if (fid_is_sane(&op_data->op_fid2)) {
292 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
295 else if (it->it_flags & FMODE_EXEC)
300 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 /* If CREATE, cancel parent's UPDATE lock. */
306 if (it->it_op & IT_CREAT)
310 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
312 MDS_INODELOCK_UPDATE);
314 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
315 &RQF_LDLM_INTENT_OPEN);
317 ldlm_lock_list_put(&cancels, l_bl_ast, count);
318 RETURN(ERR_PTR(-ENOMEM));
321 /* parent capability */
322 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
323 /* child capability, reserve the size according to parent capa, it will
324 * be filled after we get the reply */
325 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
327 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
328 op_data->op_namelen + 1);
329 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
330 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
332 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334 ptlrpc_request_free(req);
338 cfs_spin_lock(&req->rq_lock);
339 req->rq_replay = req->rq_import->imp_replayable;
340 cfs_spin_unlock(&req->rq_lock);
342 /* pack the intent */
343 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344 lit->opc = (__u64)it->it_op;
346 /* pack the intended request */
347 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
350 /* for remote client, fetch remote perm for current user */
351 if (client_is_remote(exp))
352 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
353 sizeof(struct mdt_remote_perm));
354 ptlrpc_request_set_replen(req);
358 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
359 struct lookup_intent *it,
360 struct md_op_data *op_data)
362 struct ptlrpc_request *req;
363 struct obd_device *obddev = class_exp2obd(exp);
364 struct ldlm_intent *lit;
368 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369 &RQF_LDLM_INTENT_UNLINK);
371 RETURN(ERR_PTR(-ENOMEM));
373 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
374 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
375 op_data->op_namelen + 1);
377 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
379 ptlrpc_request_free(req);
383 /* pack the intent */
384 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
385 lit->opc = (__u64)it->it_op;
387 /* pack the intended request */
388 mdc_unlink_pack(req, op_data);
390 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
391 obddev->u.cli.cl_max_mds_easize);
392 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
393 obddev->u.cli.cl_max_mds_cookiesize);
394 ptlrpc_request_set_replen(req);
398 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
399 struct lookup_intent *it,
400 struct md_op_data *op_data)
402 struct ptlrpc_request *req;
403 struct obd_device *obddev = class_exp2obd(exp);
404 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
405 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
406 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
407 (client_is_remote(exp) ?
408 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
409 struct ldlm_intent *lit;
413 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
414 &RQF_LDLM_INTENT_GETATTR);
416 RETURN(ERR_PTR(-ENOMEM));
418 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
419 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
420 op_data->op_namelen + 1);
422 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
424 ptlrpc_request_free(req);
428 /* pack the intent */
429 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
430 lit->opc = (__u64)it->it_op;
432 /* pack the intended request */
433 mdc_getattr_pack(req, valid, it->it_flags, op_data,
434 obddev->u.cli.cl_max_mds_easize);
436 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
437 obddev->u.cli.cl_max_mds_easize);
438 if (client_is_remote(exp))
439 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
440 sizeof(struct mdt_remote_perm));
441 ptlrpc_request_set_replen(req);
445 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
447 struct ptlrpc_request *req;
451 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
453 RETURN(ERR_PTR(-ENOMEM));
455 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457 ptlrpc_request_free(req);
461 ptlrpc_request_set_replen(req);
465 static int mdc_finish_enqueue(struct obd_export *exp,
466 struct ptlrpc_request *req,
467 struct ldlm_enqueue_info *einfo,
468 struct lookup_intent *it,
469 struct lustre_handle *lockh,
472 struct req_capsule *pill = &req->rq_pill;
473 struct ldlm_request *lockreq;
474 struct ldlm_reply *lockrep;
478 /* Similarly, if we're going to replay this request, we don't want to
479 * actually get a lock, just perform the intent. */
480 if (req->rq_transno || req->rq_replay) {
481 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
482 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
485 if (rc == ELDLM_LOCK_ABORTED) {
487 memset(lockh, 0, sizeof(*lockh));
489 } else { /* rc = 0 */
490 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
493 /* If the server gave us back a different lock mode, we should
494 * fix up our variables. */
495 if (lock->l_req_mode != einfo->ei_mode) {
496 ldlm_lock_addref(lockh, lock->l_req_mode);
497 ldlm_lock_decref(lockh, einfo->ei_mode);
498 einfo->ei_mode = lock->l_req_mode;
503 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
504 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
506 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
507 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
508 it->d.lustre.it_lock_mode = einfo->ei_mode;
509 it->d.lustre.it_lock_handle = lockh->cookie;
510 it->d.lustre.it_data = req;
512 if (it->d.lustre.it_status < 0 && req->rq_replay)
513 mdc_clear_replay_flag(req, it->d.lustre.it_status);
515 /* If we're doing an IT_OPEN which did not result in an actual
516 * successful open, then we need to remove the bit which saves
517 * this request for unconditional replay.
519 * It's important that we do this first! Otherwise we might exit the
520 * function without doing so, and try to replay a failed create
522 if (it->it_op & IT_OPEN && req->rq_replay &&
523 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
524 mdc_clear_replay_flag(req, it->d.lustre.it_status);
526 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
527 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
529 /* We know what to expect, so we do any byte flipping required here */
530 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
531 struct mdt_body *body;
533 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
535 CERROR ("Can't swab mdt_body\n");
539 if (it_disposition(it, DISP_OPEN_OPEN) &&
540 !it_open_error(DISP_OPEN_OPEN, it)) {
542 * If this is a successful OPEN request, we need to set
543 * replay handler and data early, so that if replay
544 * happens immediately after swabbing below, new reply
545 * is swabbed by that handler correctly.
547 mdc_set_open_replay_data(NULL, NULL, req);
550 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
553 mdc_update_max_ea_from_body(exp, body);
556 * The eadata is opaque; just check that it is there.
557 * Eventually, obd_unpackmd() will check the contents.
559 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
565 * We save the reply LOV EA in case we have to replay a
566 * create for recovery. If we didn't allocate a large
567 * enough request buffer above we need to reallocate it
568 * here to hold the actual LOV EA.
570 * To not save LOV EA if request is not going to replay
571 * (for example error one).
573 if ((it->it_op & IT_OPEN) && req->rq_replay) {
575 if (req_capsule_get_size(pill, &RMF_EADATA,
578 mdc_realloc_openmsg(req, body);
580 req_capsule_shrink(pill, &RMF_EADATA,
584 req_capsule_set_size(pill, &RMF_EADATA,
588 lmm = req_capsule_client_get(pill, &RMF_EADATA);
590 memcpy(lmm, eadata, body->eadatasize);
594 if (body->valid & OBD_MD_FLRMTPERM) {
595 struct mdt_remote_perm *perm;
597 LASSERT(client_is_remote(exp));
598 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
599 lustre_swab_mdt_remote_perm);
603 if (body->valid & OBD_MD_FLMDSCAPA) {
604 struct lustre_capa *capa, *p;
606 capa = req_capsule_server_get(pill, &RMF_CAPA1);
610 if (it->it_op & IT_OPEN) {
611 /* client fid capa will be checked in replay */
612 p = req_capsule_client_get(pill, &RMF_CAPA2);
617 if (body->valid & OBD_MD_FLOSSCAPA) {
618 struct lustre_capa *capa;
620 capa = req_capsule_server_get(pill, &RMF_CAPA2);
629 /* We always reserve enough space in the reply packet for a stripe MD, because
630 * we don't know in advance the file type. */
631 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
632 struct lookup_intent *it, struct md_op_data *op_data,
633 struct lustre_handle *lockh, void *lmm, int lmmsize,
634 struct ptlrpc_request **reqp, int extra_lock_flags)
636 struct obd_device *obddev = class_exp2obd(exp);
637 struct ptlrpc_request *req = NULL;
638 struct req_capsule *pill;
639 int flags = extra_lock_flags;
641 struct ldlm_res_id res_id;
642 static const ldlm_policy_data_t lookup_policy =
643 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
644 static const ldlm_policy_data_t update_policy =
645 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
646 ldlm_policy_data_t const *policy = &lookup_policy;
649 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
652 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
655 flags |= LDLM_FL_HAS_INTENT;
656 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
657 policy = &update_policy;
663 /* The only way right now is FLOCK, in this case we hide flock
664 policy as lmm, but lmmsize is 0 */
665 LASSERT(lmm && lmmsize == 0);
666 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
668 policy = (ldlm_policy_data_t *)lmm;
669 res_id.name[3] = LDLM_FLOCK;
670 } else if (it->it_op & IT_OPEN) {
671 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
673 policy = &update_policy;
674 einfo->ei_cbdata = NULL;
676 } else if (it->it_op & IT_UNLINK)
677 req = mdc_intent_unlink_pack(exp, it, op_data);
678 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
679 req = mdc_intent_getattr_pack(exp, it, op_data);
680 else if (it->it_op == IT_READDIR)
681 req = ldlm_enqueue_pack(exp);
688 RETURN(PTR_ERR(req));
689 pill = &req->rq_pill;
691 /* It is important to obtain rpc_lock first (if applicable), so that
692 * threads that are serialised with rpc_lock are not polluting our
693 * rpcs in flight counter. We do not do flock request limiting, though*/
695 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
696 rc = mdc_enter_request(&obddev->u.cli);
698 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
699 ptlrpc_req_finished(req);
704 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
710 mdc_exit_request(&obddev->u.cli);
711 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
714 /* For flock requests we immediatelly return without further
715 delay and let caller deal with the rest, since rest of
716 this function metadata processing makes no sense for flock
722 CERROR("ldlm_cli_enqueue: %d\n", rc);
723 mdc_clear_replay_flag(req, rc);
724 ptlrpc_req_finished(req);
727 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
732 static int mdc_finish_intent_lock(struct obd_export *exp,
733 struct ptlrpc_request *request,
734 struct md_op_data *op_data,
735 struct lookup_intent *it,
736 struct lustre_handle *lockh)
738 struct lustre_handle old_lock;
739 struct mdt_body *mdt_body;
740 struct ldlm_lock *lock;
744 LASSERT(request != NULL);
745 LASSERT(request != LP_POISON);
746 LASSERT(request->rq_repmsg != LP_POISON);
748 if (!it_disposition(it, DISP_IT_EXECD)) {
749 /* The server failed before it even started executing the
750 * intent, i.e. because it couldn't unpack the request. */
751 LASSERT(it->d.lustre.it_status != 0);
752 RETURN(it->d.lustre.it_status);
754 rc = it_open_error(DISP_IT_EXECD, it);
758 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
759 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
761 /* If we were revalidating a fid/name pair, mark the intent in
762 * case we fail and get called again from lookup */
763 if (fid_is_sane(&op_data->op_fid2) &&
764 it->it_create_mode & M_CHECK_STALE &&
765 it->it_op != IT_GETATTR) {
766 it_set_disposition(it, DISP_ENQ_COMPLETE);
768 /* Also: did we find the same inode? */
769 /* sever can return one of two fids:
770 * op_fid2 - new allocated fid - if file is created.
771 * op_fid3 - existent fid - if file only open.
772 * op_fid3 is saved in lmv_intent_open */
773 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
774 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
775 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
776 "\n", PFID(&op_data->op_fid2),
777 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
782 rc = it_open_error(DISP_LOOKUP_EXECD, it);
786 /* keep requests around for the multiple phases of the call
787 * this shows the DISP_XX must guarantee we make it into the call
789 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
790 it_disposition(it, DISP_OPEN_CREATE) &&
791 !it_open_error(DISP_OPEN_CREATE, it)) {
792 it_set_disposition(it, DISP_ENQ_CREATE_REF);
793 ptlrpc_request_addref(request); /* balanced in ll_create_node */
795 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
796 it_disposition(it, DISP_OPEN_OPEN) &&
797 !it_open_error(DISP_OPEN_OPEN, it)) {
798 it_set_disposition(it, DISP_ENQ_OPEN_REF);
799 ptlrpc_request_addref(request); /* balanced in ll_file_open */
800 /* BUG 11546 - eviction in the middle of open rpc processing */
801 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
804 if (it->it_op & IT_CREAT) {
805 /* XXX this belongs in ll_create_it */
806 } else if (it->it_op == IT_OPEN) {
807 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
809 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
812 /* If we already have a matching lock, then cancel the new
813 * one. We have to set the data here instead of in
814 * mdc_enqueue, because we need to use the child's inode as
815 * the l_ast_data to match, and that's not available until
816 * intent_finish has performed the iget().) */
817 lock = ldlm_handle2lock(lockh);
819 ldlm_policy_data_t policy = lock->l_policy_data;
820 LDLM_DEBUG(lock, "matching against this");
822 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
823 &lock->l_resource->lr_name),
824 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
825 (unsigned long)lock->l_resource->lr_name.name[0],
826 (unsigned long)lock->l_resource->lr_name.name[1],
827 (unsigned long)lock->l_resource->lr_name.name[2],
828 (unsigned long)fid_seq(&mdt_body->fid1),
829 (unsigned long)fid_oid(&mdt_body->fid1),
830 (unsigned long)fid_ver(&mdt_body->fid1));
833 memcpy(&old_lock, lockh, sizeof(*lockh));
834 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
835 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
836 ldlm_lock_decref_and_cancel(lockh,
837 it->d.lustre.it_lock_mode);
838 memcpy(lockh, &old_lock, sizeof(old_lock));
839 it->d.lustre.it_lock_handle = lockh->cookie;
842 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
843 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
844 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
848 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
851 /* We could just return 1 immediately, but since we should only
852 * be called in revalidate_it if we already have a lock, let's
854 struct ldlm_res_id res_id;
855 struct lustre_handle lockh;
856 ldlm_policy_data_t policy;
860 fid_build_reg_res_name(fid, &res_id);
861 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
862 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
864 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
865 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
866 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
868 it->d.lustre.it_lock_handle = lockh.cookie;
869 it->d.lustre.it_lock_mode = mode;
876 * This long block is all about fixing up the lock and request state
877 * so that it is correct as of the moment _before_ the operation was
878 * applied; that way, the VFS will think that everything is normal and
879 * call Lustre's regular VFS methods.
881 * If we're performing a creation, that means that unless the creation
882 * failed with EEXIST, we should fake up a negative dentry.
884 * For everything else, we want to lookup to succeed.
886 * One additional note: if CREATE or OPEN succeeded, we add an extra
887 * reference to the request because we need to keep it around until
888 * ll_create/ll_open gets called.
890 * The server will return to us, in it_disposition, an indication of
891 * exactly what d.lustre.it_status refers to.
893 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
894 * otherwise if DISP_OPEN_CREATE is set, then it status is the
895 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
896 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
899 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
902 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
903 void *lmm, int lmmsize, struct lookup_intent *it,
904 int lookup_flags, struct ptlrpc_request **reqp,
905 ldlm_blocking_callback cb_blocking,
906 int extra_lock_flags)
908 struct lustre_handle lockh;
913 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
914 ", intent: %s flags %#o\n", op_data->op_namelen,
915 op_data->op_name, PFID(&op_data->op_fid2),
916 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
920 if (fid_is_sane(&op_data->op_fid2) &&
921 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
922 /* We could just return 1 immediately, but since we should only
923 * be called in revalidate_it if we already have a lock, let's
925 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2);
926 /* Only return failure if it was not GETATTR by cfid
927 (from inode_revalidate) */
928 if (rc || op_data->op_namelen != 0)
932 /* lookup_it may be called only after revalidate_it has run, because
933 * revalidate_it cannot return errors, only zero. Returning zero causes
934 * this call to lookup, which *can* return an error.
936 * We only want to execute the request associated with the intent one
937 * time, however, so don't send the request again. Instead, skip past
938 * this and use the request from revalidate. In this case, revalidate
939 * never dropped its reference, so the refcounts are all OK */
940 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
941 struct ldlm_enqueue_info einfo =
942 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
943 ldlm_completion_ast, NULL, NULL, NULL };
945 /* For case if upper layer did not alloc fid, do it now. */
946 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
947 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
949 CERROR("Can't alloc new fid, rc %d\n", rc);
953 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
954 lmm, lmmsize, NULL, extra_lock_flags);
957 } else if (!fid_is_sane(&op_data->op_fid2) ||
958 !(it->it_create_mode & M_CHECK_STALE)) {
959 /* DISP_ENQ_COMPLETE set means there is extra reference on
960 * request referenced from this intent, saved for subsequent
961 * lookup. This path is executed when we proceed to this
962 * lookup, so we clear DISP_ENQ_COMPLETE */
963 it_clear_disposition(it, DISP_ENQ_COMPLETE);
965 *reqp = it->d.lustre.it_data;
966 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
970 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
971 struct ptlrpc_request *req,
974 struct mdc_getattr_args *ga = args;
975 struct obd_export *exp = ga->ga_exp;
976 struct md_enqueue_info *minfo = ga->ga_minfo;
977 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
978 struct lookup_intent *it;
979 struct lustre_handle *lockh;
980 struct obd_device *obddev;
981 int flags = LDLM_FL_HAS_INTENT;
985 lockh = &minfo->mi_lockh;
987 obddev = class_exp2obd(exp);
989 mdc_exit_request(&obddev->u.cli);
990 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
993 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
994 &flags, NULL, 0, lockh, rc);
996 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
997 mdc_clear_replay_flag(req, rc);
1001 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1005 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1009 OBD_FREE_PTR(einfo);
1010 minfo->mi_cb(req, minfo, rc);
1014 int mdc_intent_getattr_async(struct obd_export *exp,
1015 struct md_enqueue_info *minfo,
1016 struct ldlm_enqueue_info *einfo)
1018 struct md_op_data *op_data = &minfo->mi_data;
1019 struct lookup_intent *it = &minfo->mi_it;
1020 struct ptlrpc_request *req;
1021 struct mdc_getattr_args *ga;
1022 struct obd_device *obddev = class_exp2obd(exp);
1023 struct ldlm_res_id res_id;
1024 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1025 * for statahead currently. Consider CMD in future, such two bits
1026 * maybe managed by different MDS, should be adjusted then. */
1027 ldlm_policy_data_t policy = {
1028 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1029 MDS_INODELOCK_UPDATE }
1032 int flags = LDLM_FL_HAS_INTENT;
1035 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1036 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1037 ldlm_it2str(it->it_op), it->it_flags);
1039 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1040 req = mdc_intent_getattr_pack(exp, it, op_data);
1044 rc = mdc_enter_request(&obddev->u.cli);
1046 ptlrpc_req_finished(req);
1050 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1051 0, &minfo->mi_lockh, 1);
1053 mdc_exit_request(&obddev->u.cli);
1054 ptlrpc_req_finished(req);
1058 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1059 ga = ptlrpc_req_async_args(req);
1061 ga->ga_minfo = minfo;
1062 ga->ga_einfo = einfo;
1064 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1065 ptlrpcd_add_req(req, PSCOPE_OTHER);