1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
40 # define EXPORT_SYMTAB
42 #define DEBUG_SUBSYSTEM S_MDC
45 # include <linux/module.h>
46 # include <linux/pagemap.h>
47 # include <linux/miscdevice.h>
48 # include <linux/init.h>
50 # include <liblustre.h>
53 #include <lustre_acl.h>
54 #include <obd_class.h>
55 #include <lustre_dlm.h>
56 /* fid_res_name_eq() */
57 #include <lustre_fid.h>
58 #include <lprocfs_status.h>
59 #include "mdc_internal.h"
61 struct mdc_getattr_args {
62 struct obd_export *ga_exp;
63 struct md_enqueue_info *ga_minfo;
64 struct ldlm_enqueue_info *ga_einfo;
67 int it_disposition(struct lookup_intent *it, int flag)
69 return it->d.lustre.it_disposition & flag;
71 EXPORT_SYMBOL(it_disposition);
73 void it_set_disposition(struct lookup_intent *it, int flag)
75 it->d.lustre.it_disposition |= flag;
77 EXPORT_SYMBOL(it_set_disposition);
79 void it_clear_disposition(struct lookup_intent *it, int flag)
81 it->d.lustre.it_disposition &= ~flag;
83 EXPORT_SYMBOL(it_clear_disposition);
85 int it_open_error(int phase, struct lookup_intent *it)
87 if (it_disposition(it, DISP_OPEN_OPEN)) {
88 if (phase >= DISP_OPEN_OPEN)
89 return it->d.lustre.it_status;
94 if (it_disposition(it, DISP_OPEN_CREATE)) {
95 if (phase >= DISP_OPEN_CREATE)
96 return it->d.lustre.it_status;
101 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102 if (phase >= DISP_LOOKUP_EXECD)
103 return it->d.lustre.it_status;
108 if (it_disposition(it, DISP_IT_EXECD)) {
109 if (phase >= DISP_IT_EXECD)
110 return it->d.lustre.it_status;
114 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115 it->d.lustre.it_status);
119 EXPORT_SYMBOL(it_open_error);
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
125 struct ldlm_lock *lock;
134 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
136 LASSERT(lock != NULL);
137 lock_res_and_lock(lock);
139 if (lock->l_ast_data && lock->l_ast_data != data) {
140 struct inode *new_inode = data;
141 struct inode *old_inode = lock->l_ast_data;
142 LASSERTF(old_inode->i_state & I_FREEING,
143 "Found existing inode %p/%lu/%u state %lu in lock: "
144 "setting data to %p/%lu/%u\n", old_inode,
145 old_inode->i_ino, old_inode->i_generation,
147 new_inode, new_inode->i_ino, new_inode->i_generation);
150 lock->l_ast_data = data;
152 *bits = lock->l_policy_data.l_inodebits.bits;
154 unlock_res_and_lock(lock);
160 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
161 const struct lu_fid *fid, ldlm_type_t type,
162 ldlm_policy_data_t *policy, ldlm_mode_t mode,
163 struct lustre_handle *lockh)
165 struct ldlm_res_id res_id;
169 fid_build_reg_res_name(fid, &res_id);
170 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
171 &res_id, type, policy, mode, lockh, 0);
175 int mdc_cancel_unused(struct obd_export *exp,
176 const struct lu_fid *fid,
177 ldlm_policy_data_t *policy,
179 ldlm_cancel_flags_t flags,
182 struct ldlm_res_id res_id;
183 struct obd_device *obd = class_exp2obd(exp);
188 fid_build_reg_res_name(fid, &res_id);
189 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
190 policy, mode, flags, opaque);
194 int mdc_change_cbdata(struct obd_export *exp,
195 const struct lu_fid *fid,
196 ldlm_iterator_t it, void *data)
198 struct ldlm_res_id res_id;
201 fid_build_reg_res_name(fid, &res_id);
202 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
209 /* find any ldlm lock of the inode in mdc
213 int mdc_find_cbdata(struct obd_export *exp,
214 const struct lu_fid *fid,
215 ldlm_iterator_t it, void *data)
217 struct ldlm_res_id res_id;
221 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
224 if (rc == LDLM_ITER_STOP)
226 else if (rc == LDLM_ITER_CONTINUE)
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 cfs_spin_lock(&req->rq_lock);
237 cfs_spin_unlock(&req->rq_lock);
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
245 /* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
281 CFS_LIST_HEAD(cancels);
287 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
289 /* XXX: openlock is not cancelled for cross-refs. */
290 /* If inode is known, cancel conflicting OPEN locks. */
291 if (fid_is_sane(&op_data->op_fid2)) {
292 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
295 else if (it->it_flags & FMODE_EXEC)
300 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 /* If CREATE, cancel parent's UPDATE lock. */
306 if (it->it_op & IT_CREAT)
310 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
312 MDS_INODELOCK_UPDATE);
314 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
315 &RQF_LDLM_INTENT_OPEN);
317 ldlm_lock_list_put(&cancels, l_bl_ast, count);
318 RETURN(ERR_PTR(-ENOMEM));
321 /* parent capability */
322 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
323 /* child capability, reserve the size according to parent capa, it will
324 * be filled after we get the reply */
325 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
327 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
328 op_data->op_namelen + 1);
329 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
330 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
332 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334 ptlrpc_request_free(req);
338 cfs_spin_lock(&req->rq_lock);
339 req->rq_replay = req->rq_import->imp_replayable;
340 cfs_spin_unlock(&req->rq_lock);
342 /* pack the intent */
343 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344 lit->opc = (__u64)it->it_op;
346 /* pack the intended request */
347 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
350 /* for remote client, fetch remote perm for current user */
351 if (client_is_remote(exp))
352 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
353 sizeof(struct mdt_remote_perm));
354 ptlrpc_request_set_replen(req);
358 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
359 struct lookup_intent *it,
360 struct md_op_data *op_data)
362 struct ptlrpc_request *req;
363 struct obd_device *obddev = class_exp2obd(exp);
364 struct ldlm_intent *lit;
368 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369 &RQF_LDLM_INTENT_UNLINK);
371 RETURN(ERR_PTR(-ENOMEM));
373 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
374 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
375 op_data->op_namelen + 1);
377 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
379 ptlrpc_request_free(req);
383 /* pack the intent */
384 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
385 lit->opc = (__u64)it->it_op;
387 /* pack the intended request */
388 mdc_unlink_pack(req, op_data);
390 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
391 obddev->u.cli.cl_max_mds_easize);
392 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
393 obddev->u.cli.cl_max_mds_cookiesize);
394 ptlrpc_request_set_replen(req);
398 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
399 struct lookup_intent *it,
400 struct md_op_data *op_data)
402 struct ptlrpc_request *req;
403 struct obd_device *obddev = class_exp2obd(exp);
404 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
405 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
406 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
407 (client_is_remote(exp) ?
408 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
409 struct ldlm_intent *lit;
413 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
414 &RQF_LDLM_INTENT_GETATTR);
416 RETURN(ERR_PTR(-ENOMEM));
418 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
419 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
420 op_data->op_namelen + 1);
422 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
424 ptlrpc_request_free(req);
428 /* pack the intent */
429 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
430 lit->opc = (__u64)it->it_op;
432 /* pack the intended request */
433 mdc_getattr_pack(req, valid, it->it_flags, op_data,
434 obddev->u.cli.cl_max_mds_easize);
436 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
437 obddev->u.cli.cl_max_mds_easize);
438 if (client_is_remote(exp))
439 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
440 sizeof(struct mdt_remote_perm));
441 ptlrpc_request_set_replen(req);
445 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
447 struct ptlrpc_request *req;
451 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
453 RETURN(ERR_PTR(-ENOMEM));
455 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457 ptlrpc_request_free(req);
461 ptlrpc_request_set_replen(req);
465 static int mdc_finish_enqueue(struct obd_export *exp,
466 struct ptlrpc_request *req,
467 struct ldlm_enqueue_info *einfo,
468 struct lookup_intent *it,
469 struct lustre_handle *lockh,
472 struct req_capsule *pill = &req->rq_pill;
473 struct ldlm_request *lockreq;
474 struct ldlm_reply *lockrep;
478 /* Similarly, if we're going to replay this request, we don't want to
479 * actually get a lock, just perform the intent. */
480 if (req->rq_transno || req->rq_replay) {
481 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
482 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
485 if (rc == ELDLM_LOCK_ABORTED) {
487 memset(lockh, 0, sizeof(*lockh));
489 } else { /* rc = 0 */
490 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
493 /* If the server gave us back a different lock mode, we should
494 * fix up our variables. */
495 if (lock->l_req_mode != einfo->ei_mode) {
496 ldlm_lock_addref(lockh, lock->l_req_mode);
497 ldlm_lock_decref(lockh, einfo->ei_mode);
498 einfo->ei_mode = lock->l_req_mode;
503 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
504 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
506 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
507 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
508 it->d.lustre.it_lock_mode = einfo->ei_mode;
509 it->d.lustre.it_lock_handle = lockh->cookie;
510 it->d.lustre.it_data = req;
512 if (it->d.lustre.it_status < 0 && req->rq_replay)
513 mdc_clear_replay_flag(req, it->d.lustre.it_status);
515 /* If we're doing an IT_OPEN which did not result in an actual
516 * successful open, then we need to remove the bit which saves
517 * this request for unconditional replay.
519 * It's important that we do this first! Otherwise we might exit the
520 * function without doing so, and try to replay a failed create
522 if (it->it_op & IT_OPEN && req->rq_replay &&
523 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
524 mdc_clear_replay_flag(req, it->d.lustre.it_status);
526 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
527 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
529 /* We know what to expect, so we do any byte flipping required here */
530 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
531 struct mdt_body *body;
533 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
535 CERROR ("Can't swab mdt_body\n");
539 if (it_disposition(it, DISP_OPEN_OPEN) &&
540 !it_open_error(DISP_OPEN_OPEN, it)) {
542 * If this is a successful OPEN request, we need to set
543 * replay handler and data early, so that if replay
544 * happens immediately after swabbing below, new reply
545 * is swabbed by that handler correctly.
547 mdc_set_open_replay_data(NULL, NULL, req);
550 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
553 mdc_update_max_ea_from_body(exp, body);
556 * The eadata is opaque; just check that it is there.
557 * Eventually, obd_unpackmd() will check the contents.
559 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
565 * We save the reply LOV EA in case we have to replay a
566 * create for recovery. If we didn't allocate a large
567 * enough request buffer above we need to reallocate it
568 * here to hold the actual LOV EA.
570 * To not save LOV EA if request is not going to replay
571 * (for example error one).
573 if ((it->it_op & IT_OPEN) && req->rq_replay) {
575 if (req_capsule_get_size(pill, &RMF_EADATA,
578 mdc_realloc_openmsg(req, body);
580 req_capsule_shrink(pill, &RMF_EADATA,
584 req_capsule_set_size(pill, &RMF_EADATA,
588 lmm = req_capsule_client_get(pill, &RMF_EADATA);
590 memcpy(lmm, eadata, body->eadatasize);
594 if (body->valid & OBD_MD_FLRMTPERM) {
595 struct mdt_remote_perm *perm;
597 LASSERT(client_is_remote(exp));
598 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
599 lustre_swab_mdt_remote_perm);
603 if (body->valid & OBD_MD_FLMDSCAPA) {
604 struct lustre_capa *capa, *p;
606 capa = req_capsule_server_get(pill, &RMF_CAPA1);
610 if (it->it_op & IT_OPEN) {
611 /* client fid capa will be checked in replay */
612 p = req_capsule_client_get(pill, &RMF_CAPA2);
617 if (body->valid & OBD_MD_FLOSSCAPA) {
618 struct lustre_capa *capa;
620 capa = req_capsule_server_get(pill, &RMF_CAPA2);
629 /* We always reserve enough space in the reply packet for a stripe MD, because
630 * we don't know in advance the file type. */
631 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
632 struct lookup_intent *it, struct md_op_data *op_data,
633 struct lustre_handle *lockh, void *lmm, int lmmsize,
634 struct ptlrpc_request **reqp, int extra_lock_flags)
636 struct obd_device *obddev = class_exp2obd(exp);
637 struct ptlrpc_request *req = NULL;
638 int flags = extra_lock_flags;
640 struct ldlm_res_id res_id;
641 static const ldlm_policy_data_t lookup_policy =
642 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
643 static const ldlm_policy_data_t update_policy =
644 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
645 ldlm_policy_data_t const *policy = &lookup_policy;
648 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
651 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
654 flags |= LDLM_FL_HAS_INTENT;
655 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
656 policy = &update_policy;
662 /* The only way right now is FLOCK, in this case we hide flock
663 policy as lmm, but lmmsize is 0 */
664 LASSERT(lmm && lmmsize == 0);
665 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
667 policy = (ldlm_policy_data_t *)lmm;
668 res_id.name[3] = LDLM_FLOCK;
669 } else if (it->it_op & IT_OPEN) {
670 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
672 policy = &update_policy;
673 einfo->ei_cbdata = NULL;
675 } else if (it->it_op & IT_UNLINK)
676 req = mdc_intent_unlink_pack(exp, it, op_data);
677 else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
678 req = mdc_intent_getattr_pack(exp, it, op_data);
679 else if (it->it_op == IT_READDIR)
680 req = ldlm_enqueue_pack(exp);
687 RETURN(PTR_ERR(req));
689 /* It is important to obtain rpc_lock first (if applicable), so that
690 * threads that are serialised with rpc_lock are not polluting our
691 * rpcs in flight counter. We do not do flock request limiting, though*/
693 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
694 rc = mdc_enter_request(&obddev->u.cli);
696 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
697 mdc_clear_replay_flag(req, 0);
698 ptlrpc_req_finished(req);
703 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
709 mdc_exit_request(&obddev->u.cli);
710 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
713 /* For flock requests we immediatelly return without further
714 delay and let caller deal with the rest, since rest of
715 this function metadata processing makes no sense for flock
721 CERROR("ldlm_cli_enqueue: %d\n", rc);
722 mdc_clear_replay_flag(req, rc);
723 ptlrpc_req_finished(req);
726 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
731 static int mdc_finish_intent_lock(struct obd_export *exp,
732 struct ptlrpc_request *request,
733 struct md_op_data *op_data,
734 struct lookup_intent *it,
735 struct lustre_handle *lockh)
737 struct lustre_handle old_lock;
738 struct mdt_body *mdt_body;
739 struct ldlm_lock *lock;
743 LASSERT(request != NULL);
744 LASSERT(request != LP_POISON);
745 LASSERT(request->rq_repmsg != LP_POISON);
747 if (!it_disposition(it, DISP_IT_EXECD)) {
748 /* The server failed before it even started executing the
749 * intent, i.e. because it couldn't unpack the request. */
750 LASSERT(it->d.lustre.it_status != 0);
751 RETURN(it->d.lustre.it_status);
753 rc = it_open_error(DISP_IT_EXECD, it);
757 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
758 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
760 /* If we were revalidating a fid/name pair, mark the intent in
761 * case we fail and get called again from lookup */
762 if (fid_is_sane(&op_data->op_fid2) &&
763 it->it_create_mode & M_CHECK_STALE &&
764 it->it_op != IT_GETATTR) {
765 it_set_disposition(it, DISP_ENQ_COMPLETE);
767 /* Also: did we find the same inode? */
768 /* sever can return one of two fids:
769 * op_fid2 - new allocated fid - if file is created.
770 * op_fid3 - existent fid - if file only open.
771 * op_fid3 is saved in lmv_intent_open */
772 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
773 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
774 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
775 "\n", PFID(&op_data->op_fid2),
776 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
781 rc = it_open_error(DISP_LOOKUP_EXECD, it);
785 /* keep requests around for the multiple phases of the call
786 * this shows the DISP_XX must guarantee we make it into the call
788 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
789 it_disposition(it, DISP_OPEN_CREATE) &&
790 !it_open_error(DISP_OPEN_CREATE, it)) {
791 it_set_disposition(it, DISP_ENQ_CREATE_REF);
792 ptlrpc_request_addref(request); /* balanced in ll_create_node */
794 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
795 it_disposition(it, DISP_OPEN_OPEN) &&
796 !it_open_error(DISP_OPEN_OPEN, it)) {
797 it_set_disposition(it, DISP_ENQ_OPEN_REF);
798 ptlrpc_request_addref(request); /* balanced in ll_file_open */
799 /* BUG 11546 - eviction in the middle of open rpc processing */
800 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
803 if (it->it_op & IT_CREAT) {
804 /* XXX this belongs in ll_create_it */
805 } else if (it->it_op == IT_OPEN) {
806 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
808 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
811 /* If we already have a matching lock, then cancel the new
812 * one. We have to set the data here instead of in
813 * mdc_enqueue, because we need to use the child's inode as
814 * the l_ast_data to match, and that's not available until
815 * intent_finish has performed the iget().) */
816 lock = ldlm_handle2lock(lockh);
818 ldlm_policy_data_t policy = lock->l_policy_data;
819 LDLM_DEBUG(lock, "matching against this");
821 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
822 &lock->l_resource->lr_name),
823 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
824 (unsigned long)lock->l_resource->lr_name.name[0],
825 (unsigned long)lock->l_resource->lr_name.name[1],
826 (unsigned long)lock->l_resource->lr_name.name[2],
827 (unsigned long)fid_seq(&mdt_body->fid1),
828 (unsigned long)fid_oid(&mdt_body->fid1),
829 (unsigned long)fid_ver(&mdt_body->fid1));
832 memcpy(&old_lock, lockh, sizeof(*lockh));
833 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
834 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
835 ldlm_lock_decref_and_cancel(lockh,
836 it->d.lustre.it_lock_mode);
837 memcpy(lockh, &old_lock, sizeof(old_lock));
838 it->d.lustre.it_lock_handle = lockh->cookie;
841 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
842 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
843 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
847 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
848 struct lu_fid *fid, __u64 *bits)
850 /* We could just return 1 immediately, but since we should only
851 * be called in revalidate_it if we already have a lock, let's
853 struct ldlm_res_id res_id;
854 struct lustre_handle lockh;
855 ldlm_policy_data_t policy;
859 if (it->d.lustre.it_lock_handle) {
860 lockh.cookie = it->d.lustre.it_lock_handle;
861 mode = ldlm_revalidate_lock_handle(&lockh, bits);
863 fid_build_reg_res_name(fid, &res_id);
866 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
869 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
872 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
875 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
876 LDLM_FL_BLOCK_GRANTED, &res_id,
878 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
882 it->d.lustre.it_lock_handle = lockh.cookie;
883 it->d.lustre.it_lock_mode = mode;
885 it->d.lustre.it_lock_handle = 0;
886 it->d.lustre.it_lock_mode = 0;
893 * This long block is all about fixing up the lock and request state
894 * so that it is correct as of the moment _before_ the operation was
895 * applied; that way, the VFS will think that everything is normal and
896 * call Lustre's regular VFS methods.
898 * If we're performing a creation, that means that unless the creation
899 * failed with EEXIST, we should fake up a negative dentry.
901 * For everything else, we want to lookup to succeed.
903 * One additional note: if CREATE or OPEN succeeded, we add an extra
904 * reference to the request because we need to keep it around until
905 * ll_create/ll_open gets called.
907 * The server will return to us, in it_disposition, an indication of
908 * exactly what d.lustre.it_status refers to.
910 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
911 * otherwise if DISP_OPEN_CREATE is set, then it status is the
912 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
913 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
916 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
919 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
920 void *lmm, int lmmsize, struct lookup_intent *it,
921 int lookup_flags, struct ptlrpc_request **reqp,
922 ldlm_blocking_callback cb_blocking,
923 int extra_lock_flags)
925 struct lustre_handle lockh;
930 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
931 ", intent: %s flags %#o\n", op_data->op_namelen,
932 op_data->op_name, PFID(&op_data->op_fid2),
933 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
937 if (fid_is_sane(&op_data->op_fid2) &&
938 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
939 /* We could just return 1 immediately, but since we should only
940 * be called in revalidate_it if we already have a lock, let's
942 it->d.lustre.it_lock_handle = 0;
943 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
944 /* Only return failure if it was not GETATTR by cfid
945 (from inode_revalidate) */
946 if (rc || op_data->op_namelen != 0)
950 /* lookup_it may be called only after revalidate_it has run, because
951 * revalidate_it cannot return errors, only zero. Returning zero causes
952 * this call to lookup, which *can* return an error.
954 * We only want to execute the request associated with the intent one
955 * time, however, so don't send the request again. Instead, skip past
956 * this and use the request from revalidate. In this case, revalidate
957 * never dropped its reference, so the refcounts are all OK */
958 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
959 struct ldlm_enqueue_info einfo =
960 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
961 ldlm_completion_ast, NULL, NULL, NULL };
963 /* For case if upper layer did not alloc fid, do it now. */
964 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
965 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
967 CERROR("Can't alloc new fid, rc %d\n", rc);
971 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
972 lmm, lmmsize, NULL, extra_lock_flags);
975 } else if (!fid_is_sane(&op_data->op_fid2) ||
976 !(it->it_create_mode & M_CHECK_STALE)) {
977 /* DISP_ENQ_COMPLETE set means there is extra reference on
978 * request referenced from this intent, saved for subsequent
979 * lookup. This path is executed when we proceed to this
980 * lookup, so we clear DISP_ENQ_COMPLETE */
981 it_clear_disposition(it, DISP_ENQ_COMPLETE);
983 *reqp = it->d.lustre.it_data;
984 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
988 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
989 struct ptlrpc_request *req,
992 struct mdc_getattr_args *ga = args;
993 struct obd_export *exp = ga->ga_exp;
994 struct md_enqueue_info *minfo = ga->ga_minfo;
995 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
996 struct lookup_intent *it;
997 struct lustre_handle *lockh;
998 struct obd_device *obddev;
999 int flags = LDLM_FL_HAS_INTENT;
1003 lockh = &minfo->mi_lockh;
1005 obddev = class_exp2obd(exp);
1007 mdc_exit_request(&obddev->u.cli);
1008 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1011 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1012 &flags, NULL, 0, lockh, rc);
1014 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1015 mdc_clear_replay_flag(req, rc);
1019 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1023 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1027 OBD_FREE_PTR(einfo);
1028 minfo->mi_cb(req, minfo, rc);
1032 int mdc_intent_getattr_async(struct obd_export *exp,
1033 struct md_enqueue_info *minfo,
1034 struct ldlm_enqueue_info *einfo)
1036 struct md_op_data *op_data = &minfo->mi_data;
1037 struct lookup_intent *it = &minfo->mi_it;
1038 struct ptlrpc_request *req;
1039 struct mdc_getattr_args *ga;
1040 struct obd_device *obddev = class_exp2obd(exp);
1041 struct ldlm_res_id res_id;
1042 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1043 * for statahead currently. Consider CMD in future, such two bits
1044 * maybe managed by different MDS, should be adjusted then. */
1045 ldlm_policy_data_t policy = {
1046 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1047 MDS_INODELOCK_UPDATE }
1050 int flags = LDLM_FL_HAS_INTENT;
1053 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1054 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1055 ldlm_it2str(it->it_op), it->it_flags);
1057 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1058 req = mdc_intent_getattr_pack(exp, it, op_data);
1062 rc = mdc_enter_request(&obddev->u.cli);
1064 ptlrpc_req_finished(req);
1068 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1069 0, &minfo->mi_lockh, 1);
1071 mdc_exit_request(&obddev->u.cli);
1072 ptlrpc_req_finished(req);
1076 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1077 ga = ptlrpc_req_async_args(req);
1079 ga->ga_minfo = minfo;
1080 ga->ga_einfo = einfo;
1082 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1083 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);