4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
129 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 LASSERT(lock != NULL);
132 lock_res_and_lock(lock);
134 if (lock->l_ast_data && lock->l_ast_data != data) {
135 struct inode *new_inode = data;
136 struct inode *old_inode = lock->l_ast_data;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
145 lock->l_ast_data = data;
147 *bits = lock->l_policy_data.l_inodebits.bits;
149 unlock_res_and_lock(lock);
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
156 const struct lu_fid *fid, ldlm_type_t type,
157 ldlm_policy_data_t *policy, ldlm_mode_t mode,
158 struct lustre_handle *lockh)
160 struct ldlm_res_id res_id;
164 fid_build_reg_res_name(fid, &res_id);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
183 fid_build_reg_res_name(fid, &res_id);
184 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185 policy, mode, flags, opaque);
189 int mdc_change_cbdata(struct obd_export *exp,
190 const struct lu_fid *fid,
191 ldlm_iterator_t it, void *data)
193 struct ldlm_res_id res_id;
196 fid_build_reg_res_name(fid, &res_id);
197 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
204 /* find any ldlm lock of the inode in mdc
208 int mdc_find_cbdata(struct obd_export *exp,
209 const struct lu_fid *fid,
210 ldlm_iterator_t it, void *data)
212 struct ldlm_res_id res_id;
216 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
219 if (rc == LDLM_ITER_STOP)
221 else if (rc == LDLM_ITER_CONTINUE)
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
228 /* Don't hold error requests for replay. */
229 if (req->rq_replay) {
230 cfs_spin_lock(&req->rq_lock);
232 cfs_spin_unlock(&req->rq_lock);
234 if (rc && req->rq_transno != 0) {
235 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240 /* Save a large LOV EA into the request buffer so that it is available
241 * for replay. We don't do this in the initial request because the
242 * original request doesn't need this buffer (at most it sends just the
243 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244 * buffer and may also be difficult to allocate and save a very large
245 * request buffer for each open. (bug 5707)
247 * OOM here may cause recovery failure if lmm is needed (only for the
248 * original open if the MDS crashed just when this client also OOM'd)
249 * but this is incredibly unlikely, and questionable whether the client
250 * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252 struct mdt_body *body)
256 /* FIXME: remove this explicit offset. */
257 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
260 CERROR("Can't enlarge segment %d size to %d\n",
261 DLM_INTENT_REC_OFF + 4, body->eadatasize);
262 body->valid &= ~OBD_MD_FLEASIZE;
263 body->eadatasize = 0;
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268 struct lookup_intent *it,
269 struct md_op_data *op_data,
270 void *lmm, int lmmsize,
273 struct ptlrpc_request *req;
274 struct obd_device *obddev = class_exp2obd(exp);
275 struct ldlm_intent *lit;
276 CFS_LIST_HEAD(cancels);
282 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284 /* XXX: openlock is not cancelled for cross-refs. */
285 /* If inode is known, cancel conflicting OPEN locks. */
286 if (fid_is_sane(&op_data->op_fid2)) {
287 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290 else if (it->it_flags & FMODE_EXEC)
295 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300 /* If CREATE, cancel parent's UPDATE lock. */
301 if (it->it_op & IT_CREAT)
305 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
307 MDS_INODELOCK_UPDATE);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310 &RQF_LDLM_INTENT_OPEN);
312 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313 RETURN(ERR_PTR(-ENOMEM));
316 /* parent capability */
317 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318 /* child capability, reserve the size according to parent capa, it will
319 * be filled after we get the reply */
320 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
322 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323 op_data->op_namelen + 1);
324 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
327 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
329 ptlrpc_request_free(req);
333 cfs_spin_lock(&req->rq_lock);
334 req->rq_replay = req->rq_import->imp_replayable;
335 cfs_spin_unlock(&req->rq_lock);
337 /* pack the intent */
338 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 lit->opc = (__u64)it->it_op;
341 /* pack the intended request */
342 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
345 /* for remote client, fetch remote perm for current user */
346 if (client_is_remote(exp))
347 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348 sizeof(struct mdt_remote_perm));
349 ptlrpc_request_set_replen(req);
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354 struct lookup_intent *it,
355 struct md_op_data *op_data)
357 struct ptlrpc_request *req;
358 struct obd_device *obddev = class_exp2obd(exp);
359 struct ldlm_intent *lit;
363 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364 &RQF_LDLM_INTENT_UNLINK);
366 RETURN(ERR_PTR(-ENOMEM));
368 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370 op_data->op_namelen + 1);
372 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
374 ptlrpc_request_free(req);
378 /* pack the intent */
379 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380 lit->opc = (__u64)it->it_op;
382 /* pack the intended request */
383 mdc_unlink_pack(req, op_data);
385 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386 obddev->u.cli.cl_max_mds_easize);
387 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388 obddev->u.cli.cl_max_mds_cookiesize);
389 ptlrpc_request_set_replen(req);
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394 struct lookup_intent *it,
395 struct md_op_data *op_data)
397 struct ptlrpc_request *req;
398 struct obd_device *obddev = class_exp2obd(exp);
399 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402 (client_is_remote(exp) ?
403 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404 struct ldlm_intent *lit;
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
411 RETURN(ERR_PTR(-ENOMEM));
413 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415 op_data->op_namelen + 1);
417 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 ptlrpc_request_free(req);
423 /* pack the intent */
424 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425 lit->opc = (__u64)it->it_op;
427 /* pack the intended request */
428 mdc_getattr_pack(req, valid, it->it_flags, op_data,
429 obddev->u.cli.cl_max_mds_easize);
431 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 obddev->u.cli.cl_max_mds_easize);
433 if (client_is_remote(exp))
434 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435 sizeof(struct mdt_remote_perm));
436 ptlrpc_request_set_replen(req);
440 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
442 struct ptlrpc_request *req;
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
448 RETURN(ERR_PTR(-ENOMEM));
450 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452 ptlrpc_request_free(req);
456 ptlrpc_request_set_replen(req);
460 static int mdc_finish_enqueue(struct obd_export *exp,
461 struct ptlrpc_request *req,
462 struct ldlm_enqueue_info *einfo,
463 struct lookup_intent *it,
464 struct lustre_handle *lockh,
467 struct req_capsule *pill = &req->rq_pill;
468 struct ldlm_request *lockreq;
469 struct ldlm_reply *lockrep;
473 /* Similarly, if we're going to replay this request, we don't want to
474 * actually get a lock, just perform the intent. */
475 if (req->rq_transno || req->rq_replay) {
476 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
477 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
480 if (rc == ELDLM_LOCK_ABORTED) {
482 memset(lockh, 0, sizeof(*lockh));
484 } else { /* rc = 0 */
485 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
488 /* If the server gave us back a different lock mode, we should
489 * fix up our variables. */
490 if (lock->l_req_mode != einfo->ei_mode) {
491 ldlm_lock_addref(lockh, lock->l_req_mode);
492 ldlm_lock_decref(lockh, einfo->ei_mode);
493 einfo->ei_mode = lock->l_req_mode;
498 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
499 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
501 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
502 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
503 it->d.lustre.it_lock_mode = einfo->ei_mode;
504 it->d.lustre.it_lock_handle = lockh->cookie;
505 it->d.lustre.it_data = req;
507 if (it->d.lustre.it_status < 0 && req->rq_replay)
508 mdc_clear_replay_flag(req, it->d.lustre.it_status);
510 /* If we're doing an IT_OPEN which did not result in an actual
511 * successful open, then we need to remove the bit which saves
512 * this request for unconditional replay.
514 * It's important that we do this first! Otherwise we might exit the
515 * function without doing so, and try to replay a failed create
517 if (it->it_op & IT_OPEN && req->rq_replay &&
518 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
519 mdc_clear_replay_flag(req, it->d.lustre.it_status);
521 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
522 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
524 /* We know what to expect, so we do any byte flipping required here */
525 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
526 struct mdt_body *body;
528 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
530 CERROR ("Can't swab mdt_body\n");
534 if (it_disposition(it, DISP_OPEN_OPEN) &&
535 !it_open_error(DISP_OPEN_OPEN, it)) {
537 * If this is a successful OPEN request, we need to set
538 * replay handler and data early, so that if replay
539 * happens immediately after swabbing below, new reply
540 * is swabbed by that handler correctly.
542 mdc_set_open_replay_data(NULL, NULL, req);
545 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
548 mdc_update_max_ea_from_body(exp, body);
551 * The eadata is opaque; just check that it is there.
552 * Eventually, obd_unpackmd() will check the contents.
554 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
560 * We save the reply LOV EA in case we have to replay a
561 * create for recovery. If we didn't allocate a large
562 * enough request buffer above we need to reallocate it
563 * here to hold the actual LOV EA.
565 * To not save LOV EA if request is not going to replay
566 * (for example error one).
568 if ((it->it_op & IT_OPEN) && req->rq_replay) {
570 if (req_capsule_get_size(pill, &RMF_EADATA,
573 mdc_realloc_openmsg(req, body);
575 req_capsule_shrink(pill, &RMF_EADATA,
579 req_capsule_set_size(pill, &RMF_EADATA,
583 lmm = req_capsule_client_get(pill, &RMF_EADATA);
585 memcpy(lmm, eadata, body->eadatasize);
589 if (body->valid & OBD_MD_FLRMTPERM) {
590 struct mdt_remote_perm *perm;
592 LASSERT(client_is_remote(exp));
593 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
594 lustre_swab_mdt_remote_perm);
598 if (body->valid & OBD_MD_FLMDSCAPA) {
599 struct lustre_capa *capa, *p;
601 capa = req_capsule_server_get(pill, &RMF_CAPA1);
605 if (it->it_op & IT_OPEN) {
606 /* client fid capa will be checked in replay */
607 p = req_capsule_client_get(pill, &RMF_CAPA2);
612 if (body->valid & OBD_MD_FLOSSCAPA) {
613 struct lustre_capa *capa;
615 capa = req_capsule_server_get(pill, &RMF_CAPA2);
624 /* We always reserve enough space in the reply packet for a stripe MD, because
625 * we don't know in advance the file type. */
626 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
627 struct lookup_intent *it, struct md_op_data *op_data,
628 struct lustre_handle *lockh, void *lmm, int lmmsize,
629 struct ptlrpc_request **reqp, int extra_lock_flags)
631 struct obd_device *obddev = class_exp2obd(exp);
632 struct ptlrpc_request *req = NULL;
633 int flags, saved_flags = extra_lock_flags;
635 struct ldlm_res_id res_id;
636 static const ldlm_policy_data_t lookup_policy =
637 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
638 static const ldlm_policy_data_t update_policy =
639 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
640 ldlm_policy_data_t const *policy = &lookup_policy;
641 int generation, resends = 0;
642 struct ldlm_reply *lockrep;
645 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
648 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
651 saved_flags |= LDLM_FL_HAS_INTENT;
652 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
653 policy = &update_policy;
655 LASSERT(reqp == NULL);
657 generation = obddev->u.cli.cl_import->imp_generation;
661 /* The only way right now is FLOCK, in this case we hide flock
662 policy as lmm, but lmmsize is 0 */
663 LASSERT(lmm && lmmsize == 0);
664 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
666 policy = (ldlm_policy_data_t *)lmm;
667 res_id.name[3] = LDLM_FLOCK;
668 } else if (it->it_op & IT_OPEN) {
669 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
671 policy = &update_policy;
672 einfo->ei_cbdata = NULL;
674 } else if (it->it_op & IT_UNLINK)
675 req = mdc_intent_unlink_pack(exp, it, op_data);
676 else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
677 req = mdc_intent_getattr_pack(exp, it, op_data);
678 else if (it->it_op == IT_READDIR)
679 req = ldlm_enqueue_pack(exp);
686 RETURN(PTR_ERR(req));
689 req->rq_generation_set = 1;
690 req->rq_import_generation = generation;
691 req->rq_sent = cfs_time_current_sec() + resends;
694 /* It is important to obtain rpc_lock first (if applicable), so that
695 * threads that are serialised with rpc_lock are not polluting our
696 * rpcs in flight counter. We do not do flock request limiting, though*/
698 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
699 rc = mdc_enter_request(&obddev->u.cli);
701 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
702 mdc_clear_replay_flag(req, 0);
703 ptlrpc_req_finished(req);
708 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
711 /* For flock requests we immediatelly return without further
712 delay and let caller deal with the rest, since rest of
713 this function metadata processing makes no sense for flock
718 mdc_exit_request(&obddev->u.cli);
719 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
722 CERROR("ldlm_cli_enqueue: %d\n", rc);
723 mdc_clear_replay_flag(req, rc);
724 ptlrpc_req_finished(req);
728 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
729 LASSERT(lockrep != NULL);
731 /* Retry the create infinitely when we get -EINPROGRESS from
732 * server. This is required by the new quota design. */
733 if (it && it->it_op & IT_CREAT &&
734 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
735 mdc_clear_replay_flag(req, rc);
736 ptlrpc_req_finished(req);
739 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
740 obddev->obd_name, resends, it->it_op,
741 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
743 if (generation == obddev->u.cli.cl_import->imp_generation) {
746 CDEBUG(D_HA, "resned cross eviction\n");
751 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
756 static int mdc_finish_intent_lock(struct obd_export *exp,
757 struct ptlrpc_request *request,
758 struct md_op_data *op_data,
759 struct lookup_intent *it,
760 struct lustre_handle *lockh)
762 struct lustre_handle old_lock;
763 struct mdt_body *mdt_body;
764 struct ldlm_lock *lock;
768 LASSERT(request != NULL);
769 LASSERT(request != LP_POISON);
770 LASSERT(request->rq_repmsg != LP_POISON);
772 if (!it_disposition(it, DISP_IT_EXECD)) {
773 /* The server failed before it even started executing the
774 * intent, i.e. because it couldn't unpack the request. */
775 LASSERT(it->d.lustre.it_status != 0);
776 RETURN(it->d.lustre.it_status);
778 rc = it_open_error(DISP_IT_EXECD, it);
782 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
783 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
785 /* If we were revalidating a fid/name pair, mark the intent in
786 * case we fail and get called again from lookup */
787 if (fid_is_sane(&op_data->op_fid2) &&
788 it->it_create_mode & M_CHECK_STALE &&
789 it->it_op != IT_GETATTR) {
790 it_set_disposition(it, DISP_ENQ_COMPLETE);
792 /* Also: did we find the same inode? */
793 /* sever can return one of two fids:
794 * op_fid2 - new allocated fid - if file is created.
795 * op_fid3 - existent fid - if file only open.
796 * op_fid3 is saved in lmv_intent_open */
797 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
798 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
799 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
800 "\n", PFID(&op_data->op_fid2),
801 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
806 rc = it_open_error(DISP_LOOKUP_EXECD, it);
810 /* keep requests around for the multiple phases of the call
811 * this shows the DISP_XX must guarantee we make it into the call
813 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
814 it_disposition(it, DISP_OPEN_CREATE) &&
815 !it_open_error(DISP_OPEN_CREATE, it)) {
816 it_set_disposition(it, DISP_ENQ_CREATE_REF);
817 ptlrpc_request_addref(request); /* balanced in ll_create_node */
819 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
820 it_disposition(it, DISP_OPEN_OPEN) &&
821 !it_open_error(DISP_OPEN_OPEN, it)) {
822 it_set_disposition(it, DISP_ENQ_OPEN_REF);
823 ptlrpc_request_addref(request); /* balanced in ll_file_open */
824 /* BUG 11546 - eviction in the middle of open rpc processing */
825 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
828 if (it->it_op & IT_CREAT) {
829 /* XXX this belongs in ll_create_it */
830 } else if (it->it_op == IT_OPEN) {
831 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
833 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
836 /* If we already have a matching lock, then cancel the new
837 * one. We have to set the data here instead of in
838 * mdc_enqueue, because we need to use the child's inode as
839 * the l_ast_data to match, and that's not available until
840 * intent_finish has performed the iget().) */
841 lock = ldlm_handle2lock(lockh);
843 ldlm_policy_data_t policy = lock->l_policy_data;
844 LDLM_DEBUG(lock, "matching against this");
846 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
847 &lock->l_resource->lr_name),
848 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
849 (unsigned long)lock->l_resource->lr_name.name[0],
850 (unsigned long)lock->l_resource->lr_name.name[1],
851 (unsigned long)lock->l_resource->lr_name.name[2],
852 (unsigned long)fid_seq(&mdt_body->fid1),
853 (unsigned long)fid_oid(&mdt_body->fid1),
854 (unsigned long)fid_ver(&mdt_body->fid1));
857 memcpy(&old_lock, lockh, sizeof(*lockh));
858 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
859 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
860 ldlm_lock_decref_and_cancel(lockh,
861 it->d.lustre.it_lock_mode);
862 memcpy(lockh, &old_lock, sizeof(old_lock));
863 it->d.lustre.it_lock_handle = lockh->cookie;
866 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
867 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
868 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
872 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
873 struct lu_fid *fid, __u64 *bits)
875 /* We could just return 1 immediately, but since we should only
876 * be called in revalidate_it if we already have a lock, let's
878 struct ldlm_res_id res_id;
879 struct lustre_handle lockh;
880 ldlm_policy_data_t policy;
884 if (it->d.lustre.it_lock_handle) {
885 lockh.cookie = it->d.lustre.it_lock_handle;
886 mode = ldlm_revalidate_lock_handle(&lockh, bits);
888 fid_build_reg_res_name(fid, &res_id);
891 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
894 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
897 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
900 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
901 LDLM_FL_BLOCK_GRANTED, &res_id,
903 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
907 it->d.lustre.it_lock_handle = lockh.cookie;
908 it->d.lustre.it_lock_mode = mode;
910 it->d.lustre.it_lock_handle = 0;
911 it->d.lustre.it_lock_mode = 0;
918 * This long block is all about fixing up the lock and request state
919 * so that it is correct as of the moment _before_ the operation was
920 * applied; that way, the VFS will think that everything is normal and
921 * call Lustre's regular VFS methods.
923 * If we're performing a creation, that means that unless the creation
924 * failed with EEXIST, we should fake up a negative dentry.
926 * For everything else, we want to lookup to succeed.
928 * One additional note: if CREATE or OPEN succeeded, we add an extra
929 * reference to the request because we need to keep it around until
930 * ll_create/ll_open gets called.
932 * The server will return to us, in it_disposition, an indication of
933 * exactly what d.lustre.it_status refers to.
935 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
936 * otherwise if DISP_OPEN_CREATE is set, then it status is the
937 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
938 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
941 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
944 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
945 void *lmm, int lmmsize, struct lookup_intent *it,
946 int lookup_flags, struct ptlrpc_request **reqp,
947 ldlm_blocking_callback cb_blocking,
948 int extra_lock_flags)
950 struct lustre_handle lockh;
955 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
956 ", intent: %s flags %#o\n", op_data->op_namelen,
957 op_data->op_name, PFID(&op_data->op_fid2),
958 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
962 if (fid_is_sane(&op_data->op_fid2) &&
963 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
964 /* We could just return 1 immediately, but since we should only
965 * be called in revalidate_it if we already have a lock, let's
967 it->d.lustre.it_lock_handle = 0;
968 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
969 /* Only return failure if it was not GETATTR by cfid
970 (from inode_revalidate) */
971 if (rc || op_data->op_namelen != 0)
975 /* lookup_it may be called only after revalidate_it has run, because
976 * revalidate_it cannot return errors, only zero. Returning zero causes
977 * this call to lookup, which *can* return an error.
979 * We only want to execute the request associated with the intent one
980 * time, however, so don't send the request again. Instead, skip past
981 * this and use the request from revalidate. In this case, revalidate
982 * never dropped its reference, so the refcounts are all OK */
983 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
984 struct ldlm_enqueue_info einfo =
985 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
986 ldlm_completion_ast, NULL, NULL, NULL };
988 /* For case if upper layer did not alloc fid, do it now. */
989 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
990 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
992 CERROR("Can't alloc new fid, rc %d\n", rc);
996 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
997 lmm, lmmsize, NULL, extra_lock_flags);
1000 } else if (!fid_is_sane(&op_data->op_fid2) ||
1001 !(it->it_create_mode & M_CHECK_STALE)) {
1002 /* DISP_ENQ_COMPLETE set means there is extra reference on
1003 * request referenced from this intent, saved for subsequent
1004 * lookup. This path is executed when we proceed to this
1005 * lookup, so we clear DISP_ENQ_COMPLETE */
1006 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1008 *reqp = it->d.lustre.it_data;
1009 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1013 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1014 struct ptlrpc_request *req,
1017 struct mdc_getattr_args *ga = args;
1018 struct obd_export *exp = ga->ga_exp;
1019 struct md_enqueue_info *minfo = ga->ga_minfo;
1020 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1021 struct lookup_intent *it;
1022 struct lustre_handle *lockh;
1023 struct obd_device *obddev;
1024 int flags = LDLM_FL_HAS_INTENT;
1028 lockh = &minfo->mi_lockh;
1030 obddev = class_exp2obd(exp);
1032 mdc_exit_request(&obddev->u.cli);
1033 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1036 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1037 &flags, NULL, 0, lockh, rc);
1039 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1040 mdc_clear_replay_flag(req, rc);
1044 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1048 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1052 OBD_FREE_PTR(einfo);
1053 minfo->mi_cb(req, minfo, rc);
1057 int mdc_intent_getattr_async(struct obd_export *exp,
1058 struct md_enqueue_info *minfo,
1059 struct ldlm_enqueue_info *einfo)
1061 struct md_op_data *op_data = &minfo->mi_data;
1062 struct lookup_intent *it = &minfo->mi_it;
1063 struct ptlrpc_request *req;
1064 struct mdc_getattr_args *ga;
1065 struct obd_device *obddev = class_exp2obd(exp);
1066 struct ldlm_res_id res_id;
1067 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1068 * for statahead currently. Consider CMD in future, such two bits
1069 * maybe managed by different MDS, should be adjusted then. */
1070 ldlm_policy_data_t policy = {
1071 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1072 MDS_INODELOCK_UPDATE }
1075 int flags = LDLM_FL_HAS_INTENT;
1078 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1079 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1080 ldlm_it2str(it->it_op), it->it_flags);
1082 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1083 req = mdc_intent_getattr_pack(exp, it, op_data);
1087 rc = mdc_enter_request(&obddev->u.cli);
1089 ptlrpc_req_finished(req);
1093 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1094 0, &minfo->mi_lockh, 1);
1096 mdc_exit_request(&obddev->u.cli);
1097 ptlrpc_req_finished(req);
1101 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1102 ga = ptlrpc_req_async_args(req);
1104 ga->ga_minfo = minfo;
1105 ga->ga_einfo = einfo;
1107 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1108 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);