1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
117 struct ldlm_lock *lock;
128 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130 LASSERT(lock != NULL);
131 lock_res_and_lock(lock);
133 if (lock->l_ast_data && lock->l_ast_data != data) {
134 struct inode *new_inode = data;
135 struct inode *old_inode = lock->l_ast_data;
136 LASSERTF(old_inode->i_state & I_FREEING,
137 "Found existing inode %p/%lu/%u state %lu in lock: "
138 "setting data to %p/%lu/%u\n", old_inode,
139 old_inode->i_ino, old_inode->i_generation,
141 new_inode, new_inode->i_ino, new_inode->i_generation);
144 lock->l_ast_data = data;
146 *bits = lock->l_policy_data.l_inodebits.bits;
148 unlock_res_and_lock(lock);
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
159 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165 &res_id, type, policy, mode, lockh, 0);
169 int mdc_cancel_unused(struct obd_export *exp,
170 const struct lu_fid *fid,
171 ldlm_policy_data_t *policy,
172 ldlm_mode_t mode, int flags, void *opaque)
174 struct ldlm_res_id res_id;
175 struct obd_device *obd = class_exp2obd(exp);
180 fid_build_reg_res_name(fid, &res_id);
181 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
182 policy, mode, flags, opaque);
186 int mdc_change_cbdata(struct obd_export *exp,
187 const struct lu_fid *fid,
188 ldlm_iterator_t it, void *data)
190 struct ldlm_res_id res_id;
193 fid_build_reg_res_name(fid, &res_id);
194 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
201 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
203 /* Don't hold error requests for replay. */
204 if (req->rq_replay) {
205 spin_lock(&req->rq_lock);
207 spin_unlock(&req->rq_lock);
209 if (rc && req->rq_transno != 0) {
210 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
215 /* Save a large LOV EA into the request buffer so that it is available
216 * for replay. We don't do this in the initial request because the
217 * original request doesn't need this buffer (at most it sends just the
218 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
219 * buffer and may also be difficult to allocate and save a very large
220 * request buffer for each open. (bug 5707)
222 * OOM here may cause recovery failure if lmm is needed (only for the
223 * original open if the MDS crashed just when this client also OOM'd)
224 * but this is incredibly unlikely, and questionable whether the client
225 * could do MDS recovery under OOM anyways... */
226 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
227 struct mdt_body *body)
231 /* FIXME: remove this explicit offset. */
232 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
235 CERROR("Can't enlarge segment %d size to %d\n",
236 DLM_INTENT_REC_OFF + 4, body->eadatasize);
237 body->valid &= ~OBD_MD_FLEASIZE;
238 body->eadatasize = 0;
242 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
243 struct lookup_intent *it,
244 struct md_op_data *op_data,
245 void *lmm, int lmmsize,
248 struct ptlrpc_request *req;
249 struct obd_device *obddev = class_exp2obd(exp);
250 struct ldlm_intent *lit;
251 int joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
253 CFS_LIST_HEAD(cancels);
259 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261 /* XXX: openlock is not cancelled for cross-refs. */
262 /* If inode is known, cancel conflicting OPEN locks. */
263 if (fid_is_sane(&op_data->op_fid2)) {
264 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
267 else if (it->it_flags & FMODE_EXEC)
272 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
277 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
278 if (it->it_op & IT_CREAT || joinfile)
282 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
284 MDS_INODELOCK_UPDATE);
286 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
287 &RQF_LDLM_INTENT_OPEN);
289 ldlm_lock_list_put(&cancels, l_bl_ast, count);
290 RETURN(ERR_PTR(-ENOMEM));
293 /* parent capability */
294 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
295 /* child capability, reserve the size according to parent capa, it will
296 * be filled after we get the reply */
297 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
299 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
300 op_data->op_namelen + 1);
301 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
302 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
304 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
308 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
310 ptlrpc_request_free(req);
315 __u64 head_size = *(__u64 *)op_data->op_data;
316 mdc_join_pack(req, op_data, head_size);
319 spin_lock(&req->rq_lock);
320 req->rq_replay = req->rq_import->imp_replayable;
321 spin_unlock(&req->rq_lock);
323 /* pack the intent */
324 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325 lit->opc = (__u64)it->it_op;
327 /* pack the intended request */
328 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
331 /* for remote client, fetch remote perm for current user */
332 if (client_is_remote(exp))
333 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334 sizeof(struct mdt_remote_perm));
335 ptlrpc_request_set_replen(req);
339 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
340 struct lookup_intent *it,
341 struct md_op_data *op_data)
343 struct ptlrpc_request *req;
344 struct obd_device *obddev = class_exp2obd(exp);
345 struct ldlm_intent *lit;
349 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
350 &RQF_LDLM_INTENT_UNLINK);
352 RETURN(ERR_PTR(-ENOMEM));
354 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
355 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
356 op_data->op_namelen + 1);
358 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
360 ptlrpc_request_free(req);
364 /* pack the intent */
365 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
366 lit->opc = (__u64)it->it_op;
368 /* pack the intended request */
369 mdc_unlink_pack(req, op_data);
371 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
372 obddev->u.cli.cl_max_mds_easize);
373 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
374 obddev->u.cli.cl_max_mds_cookiesize);
375 ptlrpc_request_set_replen(req);
379 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
380 struct lookup_intent *it,
381 struct md_op_data *op_data)
383 struct ptlrpc_request *req;
384 struct obd_device *obddev = class_exp2obd(exp);
385 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
386 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
387 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
388 (client_is_remote(exp) ?
389 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
390 struct ldlm_intent *lit;
394 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
395 &RQF_LDLM_INTENT_GETATTR);
397 RETURN(ERR_PTR(-ENOMEM));
399 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
400 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
401 op_data->op_namelen + 1);
403 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
405 ptlrpc_request_free(req);
409 /* pack the intent */
410 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
411 lit->opc = (__u64)it->it_op;
413 /* pack the intended request */
414 mdc_getattr_pack(req, valid, it->it_flags, op_data);
416 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
417 obddev->u.cli.cl_max_mds_easize);
418 if (client_is_remote(exp))
419 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
420 sizeof(struct mdt_remote_perm));
421 ptlrpc_request_set_replen(req);
425 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
427 struct ptlrpc_request *req;
431 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
433 RETURN(ERR_PTR(-ENOMEM));
435 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
437 ptlrpc_request_free(req);
441 ptlrpc_request_set_replen(req);
445 static int mdc_finish_enqueue(struct obd_export *exp,
446 struct ptlrpc_request *req,
447 struct ldlm_enqueue_info *einfo,
448 struct lookup_intent *it,
449 struct lustre_handle *lockh,
452 struct req_capsule *pill = &req->rq_pill;
453 struct ldlm_request *lockreq;
454 struct ldlm_reply *lockrep;
458 /* Similarly, if we're going to replay this request, we don't want to
459 * actually get a lock, just perform the intent. */
460 if (req->rq_transno || req->rq_replay) {
461 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
462 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
465 if (rc == ELDLM_LOCK_ABORTED) {
467 memset(lockh, 0, sizeof(*lockh));
469 } else { /* rc = 0 */
470 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
473 /* If the server gave us back a different lock mode, we should
474 * fix up our variables. */
475 if (lock->l_req_mode != einfo->ei_mode) {
476 ldlm_lock_addref(lockh, lock->l_req_mode);
477 ldlm_lock_decref(lockh, einfo->ei_mode);
478 einfo->ei_mode = lock->l_req_mode;
483 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
484 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
486 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
487 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
488 it->d.lustre.it_lock_mode = einfo->ei_mode;
489 it->d.lustre.it_lock_handle = lockh->cookie;
490 it->d.lustre.it_data = req;
492 if (it->d.lustre.it_status < 0 && req->rq_replay)
493 mdc_clear_replay_flag(req, it->d.lustre.it_status);
495 /* If we're doing an IT_OPEN which did not result in an actual
496 * successful open, then we need to remove the bit which saves
497 * this request for unconditional replay.
499 * It's important that we do this first! Otherwise we might exit the
500 * function without doing so, and try to replay a failed create
502 if (it->it_op & IT_OPEN && req->rq_replay &&
503 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
504 mdc_clear_replay_flag(req, it->d.lustre.it_status);
506 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
507 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
509 /* We know what to expect, so we do any byte flipping required here */
510 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
511 struct mdt_body *body;
513 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
515 CERROR ("Can't swab mdt_body\n");
519 if (it_disposition(it, DISP_OPEN_OPEN) &&
520 !it_open_error(DISP_OPEN_OPEN, it)) {
522 * If this is a successful OPEN request, we need to set
523 * replay handler and data early, so that if replay
524 * happens immediately after swabbing below, new reply
525 * is swabbed by that handler correctly.
527 mdc_set_open_replay_data(NULL, NULL, req);
530 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
534 * The eadata is opaque; just check that it is there.
535 * Eventually, obd_unpackmd() will check the contents.
537 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
543 * We save the reply LOV EA in case we have to replay a
544 * create for recovery. If we didn't allocate a large
545 * enough request buffer above we need to reallocate it
546 * here to hold the actual LOV EA.
548 * To not save LOV EA if request is not going to replay
549 * (for example error one).
551 if ((it->it_op & IT_OPEN) && req->rq_replay) {
553 if (req_capsule_get_size(pill, &RMF_EADATA,
556 mdc_realloc_openmsg(req, body);
558 req_capsule_shrink(pill, &RMF_EADATA,
562 req_capsule_set_size(pill, &RMF_EADATA,
566 lmm = req_capsule_client_get(pill, &RMF_EADATA);
568 memcpy(lmm, eadata, body->eadatasize);
572 if (body->valid & OBD_MD_FLRMTPERM) {
573 struct mdt_remote_perm *perm;
575 LASSERT(client_is_remote(exp));
576 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
577 lustre_swab_mdt_remote_perm);
581 if (body->valid & OBD_MD_FLMDSCAPA) {
582 struct lustre_capa *capa, *p;
584 capa = req_capsule_server_get(pill, &RMF_CAPA1);
588 if (it->it_op & IT_OPEN) {
589 /* client fid capa will be checked in replay */
590 p = req_capsule_client_get(pill, &RMF_CAPA2);
595 if (body->valid & OBD_MD_FLOSSCAPA) {
596 struct lustre_capa *capa;
598 capa = req_capsule_server_get(pill, &RMF_CAPA2);
607 /* We always reserve enough space in the reply packet for a stripe MD, because
608 * we don't know in advance the file type. */
609 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
610 struct lookup_intent *it, struct md_op_data *op_data,
611 struct lustre_handle *lockh, void *lmm, int lmmsize,
612 struct ptlrpc_request **reqp, int extra_lock_flags)
614 struct obd_device *obddev = class_exp2obd(exp);
615 struct ptlrpc_request *req = NULL;
616 struct req_capsule *pill;
617 int flags = extra_lock_flags;
619 struct ldlm_res_id res_id;
620 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
623 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
626 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
629 flags |= LDLM_FL_HAS_INTENT;
630 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
631 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
637 /* The only way right now is FLOCK, in this case we hide flock
638 policy as lmm, but lmmsize is 0 */
639 LASSERT(lmm && lmmsize == 0);
640 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
642 policy = *(ldlm_policy_data_t *)lmm;
643 res_id.name[3] = LDLM_FLOCK;
644 } else if (it->it_op & IT_OPEN) {
645 int joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
648 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
651 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
652 einfo->ei_cbdata = NULL;
655 it->it_create_mode &= ~M_JOIN_FILE;
656 } else if (it->it_op & IT_UNLINK)
657 req = mdc_intent_unlink_pack(exp, it, op_data);
658 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
659 req = mdc_intent_getattr_pack(exp, it, op_data);
660 else if (it->it_op == IT_READDIR)
661 req = ldlm_enqueue_pack(exp);
668 RETURN(PTR_ERR(req));
669 pill = &req->rq_pill;
671 /* It is important to obtain rpc_lock first (if applicable), so that
672 * threads that are serialised with rpc_lock are not polluting our
673 * rpcs in flight counter. We do not do flock request limiting, though*/
675 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
676 mdc_enter_request(&obddev->u.cli);
678 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
684 mdc_exit_request(&obddev->u.cli);
685 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
688 /* For flock requests we immediatelly return without further
689 delay and let caller deal with the rest, since rest of
690 this function metadata processing makes no sense for flock
696 CERROR("ldlm_cli_enqueue: %d\n", rc);
697 mdc_clear_replay_flag(req, rc);
698 ptlrpc_req_finished(req);
701 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
706 static int mdc_finish_intent_lock(struct obd_export *exp,
707 struct ptlrpc_request *request,
708 struct md_op_data *op_data,
709 struct lookup_intent *it,
710 struct lustre_handle *lockh)
712 struct lustre_handle old_lock;
713 struct mdt_body *mdt_body;
714 struct ldlm_lock *lock;
718 LASSERT(request != NULL);
719 LASSERT(request != LP_POISON);
720 LASSERT(request->rq_repmsg != LP_POISON);
722 if (!it_disposition(it, DISP_IT_EXECD)) {
723 /* The server failed before it even started executing the
724 * intent, i.e. because it couldn't unpack the request. */
725 LASSERT(it->d.lustre.it_status != 0);
726 RETURN(it->d.lustre.it_status);
728 rc = it_open_error(DISP_IT_EXECD, it);
732 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
733 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
735 /* If we were revalidating a fid/name pair, mark the intent in
736 * case we fail and get called again from lookup */
737 if (fid_is_sane(&op_data->op_fid2) &&
738 it->it_create_mode & M_CHECK_STALE &&
739 it->it_op != IT_GETATTR) {
740 it_set_disposition(it, DISP_ENQ_COMPLETE);
742 /* Also: did we find the same inode? */
743 /* sever can return one of two fids:
744 * op_fid2 - new allocated fid - if file is created.
745 * op_fid3 - existent fid - if file only open.
746 * op_fid3 is saved in lmv_intent_open */
747 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
748 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
749 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
750 "\n", PFID(&op_data->op_fid2),
751 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
756 rc = it_open_error(DISP_LOOKUP_EXECD, it);
760 /* keep requests around for the multiple phases of the call
761 * this shows the DISP_XX must guarantee we make it into the call
763 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
764 it_disposition(it, DISP_OPEN_CREATE) &&
765 !it_open_error(DISP_OPEN_CREATE, it)) {
766 it_set_disposition(it, DISP_ENQ_CREATE_REF);
767 ptlrpc_request_addref(request); /* balanced in ll_create_node */
769 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
770 it_disposition(it, DISP_OPEN_OPEN) &&
771 !it_open_error(DISP_OPEN_OPEN, it)) {
772 it_set_disposition(it, DISP_ENQ_OPEN_REF);
773 ptlrpc_request_addref(request); /* balanced in ll_file_open */
774 /* BUG 11546 - eviction in the middle of open rpc processing */
775 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
778 if (it->it_op & IT_CREAT) {
779 /* XXX this belongs in ll_create_it */
780 } else if (it->it_op == IT_OPEN) {
781 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
783 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
786 /* If we already have a matching lock, then cancel the new
787 * one. We have to set the data here instead of in
788 * mdc_enqueue, because we need to use the child's inode as
789 * the l_ast_data to match, and that's not available until
790 * intent_finish has performed the iget().) */
791 lock = ldlm_handle2lock(lockh);
793 ldlm_policy_data_t policy = lock->l_policy_data;
794 LDLM_DEBUG(lock, "matching against this");
796 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
797 &lock->l_resource->lr_name),
798 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
799 (unsigned long)lock->l_resource->lr_name.name[0],
800 (unsigned long)lock->l_resource->lr_name.name[1],
801 (unsigned long)lock->l_resource->lr_name.name[2],
802 (unsigned long)fid_seq(&mdt_body->fid1),
803 (unsigned long)fid_oid(&mdt_body->fid1),
804 (unsigned long)fid_ver(&mdt_body->fid1));
807 memcpy(&old_lock, lockh, sizeof(*lockh));
808 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
809 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
810 ldlm_lock_decref_and_cancel(lockh,
811 it->d.lustre.it_lock_mode);
812 memcpy(lockh, &old_lock, sizeof(old_lock));
813 it->d.lustre.it_lock_handle = lockh->cookie;
816 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
817 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
818 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
823 * This long block is all about fixing up the lock and request state
824 * so that it is correct as of the moment _before_ the operation was
825 * applied; that way, the VFS will think that everything is normal and
826 * call Lustre's regular VFS methods.
828 * If we're performing a creation, that means that unless the creation
829 * failed with EEXIST, we should fake up a negative dentry.
831 * For everything else, we want to lookup to succeed.
833 * One additional note: if CREATE or OPEN succeeded, we add an extra
834 * reference to the request because we need to keep it around until
835 * ll_create/ll_open gets called.
837 * The server will return to us, in it_disposition, an indication of
838 * exactly what d.lustre.it_status refers to.
840 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
841 * otherwise if DISP_OPEN_CREATE is set, then it status is the
842 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
843 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
846 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
849 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
850 void *lmm, int lmmsize, struct lookup_intent *it,
851 int lookup_flags, struct ptlrpc_request **reqp,
852 ldlm_blocking_callback cb_blocking,
853 int extra_lock_flags)
855 struct lustre_handle lockh;
860 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
861 ", intent: %s flags %#o\n", op_data->op_namelen,
862 op_data->op_name, PFID(&op_data->op_fid2),
863 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
867 if (fid_is_sane(&op_data->op_fid2) &&
868 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
869 /* We could just return 1 immediately, but since we should only
870 * be called in revalidate_it if we already have a lock, let's
872 ldlm_policy_data_t policy;
875 /* As not all attributes are kept under update lock, e.g.
876 owner/group/acls are under lookup lock, we need both
877 ibits for GETATTR. */
879 /* For CMD, UPDATE lock and LOOKUP lock can not be got
880 * at the same for cross-object, so we can not match
881 * the 2 lock at the same time FIXME: but how to handle
882 * the above situation */
883 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
884 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
886 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
887 &op_data->op_fid2, LDLM_IBITS, &policy,
888 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
890 it->d.lustre.it_lock_handle = lockh.cookie;
891 it->d.lustre.it_lock_mode = mode;
894 /* Only return failure if it was not GETATTR by cfid
895 (from inode_revalidate) */
896 if (mode || op_data->op_namelen != 0)
900 /* lookup_it may be called only after revalidate_it has run, because
901 * revalidate_it cannot return errors, only zero. Returning zero causes
902 * this call to lookup, which *can* return an error.
904 * We only want to execute the request associated with the intent one
905 * time, however, so don't send the request again. Instead, skip past
906 * this and use the request from revalidate. In this case, revalidate
907 * never dropped its reference, so the refcounts are all OK */
908 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
909 struct ldlm_enqueue_info einfo =
910 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
911 ldlm_completion_ast, NULL, NULL, NULL };
913 /* For case if upper layer did not alloc fid, do it now. */
914 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
915 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
917 CERROR("Can't alloc new fid, rc %d\n", rc);
921 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
922 lmm, lmmsize, NULL, extra_lock_flags);
925 } else if (!fid_is_sane(&op_data->op_fid2) ||
926 !(it->it_create_mode & M_CHECK_STALE)) {
927 /* DISP_ENQ_COMPLETE set means there is extra reference on
928 * request referenced from this intent, saved for subsequent
929 * lookup. This path is executed when we proceed to this
930 * lookup, so we clear DISP_ENQ_COMPLETE */
931 it_clear_disposition(it, DISP_ENQ_COMPLETE);
933 *reqp = it->d.lustre.it_data;
934 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
938 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
939 struct ptlrpc_request *req,
940 void *unused, int rc)
942 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
943 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
944 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
945 struct lookup_intent *it;
946 struct lustre_handle *lockh;
947 struct obd_device *obddev;
948 int flags = LDLM_FL_HAS_INTENT;
952 lockh = &minfo->mi_lockh;
954 obddev = class_exp2obd(exp);
956 mdc_exit_request(&obddev->u.cli);
957 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
960 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
961 &flags, NULL, 0, NULL, lockh, rc);
963 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
964 mdc_clear_replay_flag(req, rc);
968 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
972 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
977 minfo->mi_cb(req, minfo, rc);
981 int mdc_intent_getattr_async(struct obd_export *exp,
982 struct md_enqueue_info *minfo,
983 struct ldlm_enqueue_info *einfo)
985 struct md_op_data *op_data = &minfo->mi_data;
986 struct lookup_intent *it = &minfo->mi_it;
987 struct ptlrpc_request *req;
988 struct obd_device *obddev = class_exp2obd(exp);
989 struct ldlm_res_id res_id;
990 ldlm_policy_data_t policy = {
991 .l_inodebits = { MDS_INODELOCK_LOOKUP }
994 int flags = LDLM_FL_HAS_INTENT;
997 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
998 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
999 ldlm_it2str(it->it_op), it->it_flags);
1001 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1002 req = mdc_intent_getattr_pack(exp, it, op_data);
1006 mdc_enter_request(&obddev->u.cli);
1007 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1008 0, NULL, &minfo->mi_lockh, 1);
1010 mdc_exit_request(&obddev->u.cli);
1014 req->rq_async_args.pointer_arg[0] = exp;
1015 req->rq_async_args.pointer_arg[1] = minfo;
1016 req->rq_async_args.pointer_arg[2] = einfo;
1017 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1018 ptlrpcd_add_req(req, PSCOPE_OTHER);
1023 int mdc_revalidate_lock(struct obd_export *exp,
1024 struct lookup_intent *it,
1027 /* We could just return 1 immediately, but since we should only
1028 * be called in revalidate_it if we already have a lock, let's
1030 struct ldlm_res_id res_id;
1031 struct lustre_handle lockh;
1032 ldlm_policy_data_t policy;
1036 fid_build_reg_res_name(fid, &res_id);
1037 /* As not all attributes are kept under update lock, e.g.
1038 owner/group/acls are under lookup lock, we need both
1039 ibits for GETATTR. */
1040 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1041 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1042 MDS_INODELOCK_LOOKUP;
1044 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1045 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1046 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1048 it->d.lustre.it_lock_handle = lockh.cookie;
1049 it->d.lustre.it_lock_mode = mode;