1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <linux/lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
116 struct ldlm_lock *lock;
124 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
126 LASSERT(lock != NULL);
127 lock_res_and_lock(lock);
129 if (lock->l_ast_data && lock->l_ast_data != data) {
130 struct inode *new_inode = data;
131 struct inode *old_inode = lock->l_ast_data;
132 LASSERTF(old_inode->i_state & I_FREEING,
133 "Found existing inode %p/%lu/%u state %lu in lock: "
134 "setting data to %p/%lu/%u\n", old_inode,
135 old_inode->i_ino, old_inode->i_generation,
137 new_inode, new_inode->i_ino, new_inode->i_generation);
140 lock->l_ast_data = data;
141 unlock_res_and_lock(lock);
147 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
148 const struct lu_fid *fid, ldlm_type_t type,
149 ldlm_policy_data_t *policy, ldlm_mode_t mode,
150 struct lustre_handle *lockh)
152 struct ldlm_res_id res_id;
156 fid_build_reg_res_name(fid, &res_id);
157 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
158 &res_id, type, policy, mode, lockh);
162 int mdc_cancel_unused(struct obd_export *exp,
163 const struct lu_fid *fid,
164 ldlm_policy_data_t *policy,
165 ldlm_mode_t mode, int flags, void *opaque)
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_change_cbdata(struct obd_export *exp,
180 const struct lu_fid *fid,
181 ldlm_iterator_t it, void *data)
183 struct ldlm_res_id res_id;
186 fid_build_reg_res_name(fid, &res_id);
187 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
194 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
196 /* Don't hold error requests for replay. */
197 if (req->rq_replay) {
198 spin_lock(&req->rq_lock);
200 spin_unlock(&req->rq_lock);
202 if (rc && req->rq_transno != 0) {
203 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
208 /* Save a large LOV EA into the request buffer so that it is available
209 * for replay. We don't do this in the initial request because the
210 * original request doesn't need this buffer (at most it sends just the
211 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212 * buffer and may also be difficult to allocate and save a very large
213 * request buffer for each open. (bug 5707)
215 * OOM here may cause recovery failure if lmm is needed (only for the
216 * original open if the MDS crashed just when this client also OOM'd)
217 * but this is incredibly unlikely, and questionable whether the client
218 * could do MDS recovery under OOM anyways... */
219 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
220 struct mdt_body *body)
224 /* FIXME: remove this explicit offset. */
225 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228 CERROR("Can't enlarge segment %d size to %d\n",
229 DLM_INTENT_REC_OFF + 4, body->eadatasize);
230 body->valid &= ~OBD_MD_FLEASIZE;
231 body->eadatasize = 0;
235 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
236 struct lookup_intent *it,
237 struct md_op_data *op_data,
238 void *lmm, int lmmsize,
241 struct ptlrpc_request *req;
242 struct obd_device *obddev = class_exp2obd(exp);
243 struct ldlm_intent *lit;
244 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
246 CFS_LIST_HEAD(cancels);
252 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
254 /* XXX: openlock is not cancelled for cross-refs. */
255 /* If inode is known, cancel conflicting OPEN locks. */
256 if (fid_is_sane(&op_data->op_fid2)) {
257 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
260 else if (it->it_flags & FMODE_EXEC)
265 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
270 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
271 if (it->it_op & IT_CREAT || joinfile)
275 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
277 MDS_INODELOCK_UPDATE);
279 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
280 &RQF_LDLM_INTENT_OPEN);
282 ldlm_lock_list_put(&cancels, l_bl_ast, count);
283 RETURN(ERR_PTR(-ENOMEM));
286 /* parent capability */
287 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
288 /* child capability, reserve the size according to parent capa, it will
289 * be filled after we get the reply */
290 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
292 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
293 op_data->op_namelen + 1);
294 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
295 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
297 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
301 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
303 ptlrpc_request_free(req);
308 __u64 head_size = *(__u64 *)op_data->op_data;
309 mdc_join_pack(req, op_data, head_size);
312 spin_lock(&req->rq_lock);
314 spin_unlock(&req->rq_lock);
316 /* pack the intent */
317 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
318 lit->opc = (__u64)it->it_op;
320 /* pack the intended request */
321 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
324 /* for remote client, fetch remote perm for current user */
325 if (client_is_remote(exp))
326 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
327 sizeof(struct mdt_remote_perm));
328 ptlrpc_request_set_replen(req);
332 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
333 struct lookup_intent *it,
334 struct md_op_data *op_data)
336 struct ptlrpc_request *req;
337 struct obd_device *obddev = class_exp2obd(exp);
338 struct ldlm_intent *lit;
342 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
343 &RQF_LDLM_INTENT_UNLINK);
345 RETURN(ERR_PTR(-ENOMEM));
347 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
348 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
349 op_data->op_namelen + 1);
351 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
353 ptlrpc_request_free(req);
357 /* pack the intent */
358 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
359 lit->opc = (__u64)it->it_op;
361 /* pack the intended request */
362 mdc_unlink_pack(req, op_data);
364 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
365 obddev->u.cli.cl_max_mds_easize);
366 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
367 obddev->u.cli.cl_max_mds_cookiesize);
368 ptlrpc_request_set_replen(req);
372 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
373 struct lookup_intent *it,
374 struct md_op_data *op_data)
376 struct ptlrpc_request *req;
377 struct obd_device *obddev = class_exp2obd(exp);
378 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
379 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
380 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
381 (client_is_remote(exp) ?
382 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
383 struct ldlm_intent *lit;
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_GETATTR);
390 RETURN(ERR_PTR(-ENOMEM));
392 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
393 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
394 op_data->op_namelen + 1);
396 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
398 ptlrpc_request_free(req);
402 /* pack the intent */
403 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
404 lit->opc = (__u64)it->it_op;
406 /* pack the intended request */
407 mdc_getattr_pack(req, valid, it->it_flags, op_data);
409 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
410 obddev->u.cli.cl_max_mds_easize);
411 if (client_is_remote(exp))
412 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
413 sizeof(struct mdt_remote_perm));
414 ptlrpc_request_set_replen(req);
418 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
420 struct ptlrpc_request *req;
424 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
426 RETURN(ERR_PTR(-ENOMEM));
428 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430 ptlrpc_request_free(req);
434 ptlrpc_request_set_replen(req);
438 static int mdc_finish_enqueue(struct obd_export *exp,
439 struct ptlrpc_request *req,
440 struct ldlm_enqueue_info *einfo,
441 struct lookup_intent *it,
442 struct lustre_handle *lockh,
445 struct req_capsule *pill = &req->rq_pill;
446 struct ldlm_request *lockreq;
447 struct ldlm_reply *lockrep;
451 /* Similarly, if we're going to replay this request, we don't want to
452 * actually get a lock, just perform the intent. */
453 if (req->rq_transno || req->rq_replay) {
454 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
455 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
458 if (rc == ELDLM_LOCK_ABORTED) {
460 memset(lockh, 0, sizeof(*lockh));
462 } else { /* rc = 0 */
463 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
466 /* If the server gave us back a different lock mode, we should
467 * fix up our variables. */
468 if (lock->l_req_mode != einfo->ei_mode) {
469 ldlm_lock_addref(lockh, lock->l_req_mode);
470 ldlm_lock_decref(lockh, einfo->ei_mode);
471 einfo->ei_mode = lock->l_req_mode;
476 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
477 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
479 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
480 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
481 it->d.lustre.it_lock_mode = einfo->ei_mode;
482 it->d.lustre.it_data = req;
484 if (it->d.lustre.it_status < 0 && req->rq_replay)
485 mdc_clear_replay_flag(req, it->d.lustre.it_status);
487 /* If we're doing an IT_OPEN which did not result in an actual
488 * successful open, then we need to remove the bit which saves
489 * this request for unconditional replay.
491 * It's important that we do this first! Otherwise we might exit the
492 * function without doing so, and try to replay a failed create
494 if (it->it_op & IT_OPEN && req->rq_replay &&
495 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
496 mdc_clear_replay_flag(req, it->d.lustre.it_status);
498 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
499 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
501 /* We know what to expect, so we do any byte flipping required here */
502 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
503 struct mdt_body *body;
505 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
507 CERROR ("Can't swab mdt_body\n");
511 if (it_disposition(it, DISP_OPEN_OPEN) &&
512 !it_open_error(DISP_OPEN_OPEN, it)) {
514 * If this is a successful OPEN request, we need to set
515 * replay handler and data early, so that if replay
516 * happens immediately after swabbing below, new reply
517 * is swabbed by that handler correctly.
519 mdc_set_open_replay_data(NULL, NULL, req);
522 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
526 * The eadata is opaque; just check that it is there.
527 * Eventually, obd_unpackmd() will check the contents.
529 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
534 if (body->valid & OBD_MD_FLMODEASIZE) {
535 struct obd_device *obddev = class_exp2obd(exp);
537 if (obddev->u.cli.cl_max_mds_easize <
539 obddev->u.cli.cl_max_mds_easize =
541 CDEBUG(D_INFO, "maxeasize become %d\n",
544 if (obddev->u.cli.cl_max_mds_cookiesize <
545 body->max_cookiesize) {
546 obddev->u.cli.cl_max_mds_cookiesize =
547 body->max_cookiesize;
548 CDEBUG(D_INFO, "cookiesize become %d\n",
549 body->max_cookiesize);
554 * We save the reply LOV EA in case we have to replay a
555 * create for recovery. If we didn't allocate a large
556 * enough request buffer above we need to reallocate it
557 * here to hold the actual LOV EA.
559 * To not save LOV EA if request is not going to replay
560 * (for example error one).
562 if ((it->it_op & IT_OPEN) && req->rq_replay) {
564 if (req_capsule_get_size(pill, &RMF_EADATA,
567 mdc_realloc_openmsg(req, body);
568 req_capsule_set_size(pill, &RMF_EADATA,
572 lmm = req_capsule_client_get(pill, &RMF_EADATA);
574 memcpy(lmm, eadata, body->eadatasize);
578 if (body->valid & OBD_MD_FLRMTPERM) {
579 struct mdt_remote_perm *perm;
581 LASSERT(client_is_remote(exp));
582 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
583 lustre_swab_mdt_remote_perm);
587 if (body->valid & OBD_MD_FLMDSCAPA) {
588 struct lustre_capa *capa, *p;
590 capa = req_capsule_server_get(pill, &RMF_CAPA1);
594 if (it->it_op & IT_OPEN) {
595 /* client fid capa will be checked in replay */
596 p = req_capsule_client_get(pill, &RMF_CAPA2);
601 if (body->valid & OBD_MD_FLOSSCAPA) {
602 struct lustre_capa *capa;
604 capa = req_capsule_server_get(pill, &RMF_CAPA2);
613 /* We always reserve enough space in the reply packet for a stripe MD, because
614 * we don't know in advance the file type. */
615 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
616 struct lookup_intent *it, struct md_op_data *op_data,
617 struct lustre_handle *lockh, void *lmm, int lmmsize,
618 int extra_lock_flags)
620 struct obd_device *obddev = class_exp2obd(exp);
621 struct ptlrpc_request *req;
622 struct req_capsule *pill;
623 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
625 struct ldlm_res_id res_id;
626 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
629 LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type);
631 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
633 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
634 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
636 if (it->it_op & IT_OPEN) {
637 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
640 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
643 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
644 einfo->ei_cbdata = NULL;
647 it->it_flags &= ~O_JOIN_FILE;
648 } else if (it->it_op & IT_UNLINK)
649 req = mdc_intent_unlink_pack(exp, it, op_data);
650 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
651 req = mdc_intent_getattr_pack(exp, it, op_data);
652 else if (it->it_op == IT_READDIR)
653 req = ldlm_enqueue_pack(exp);
660 RETURN(PTR_ERR(req));
661 pill = &req->rq_pill;
663 /* It is important to obtain rpc_lock first (if applicable), so that
664 * threads that are serialised with rpc_lock are not polluting our
665 * rpcs in flight counter */
666 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
667 mdc_enter_request(&obddev->u.cli);
668 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
670 mdc_exit_request(&obddev->u.cli);
671 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
673 CERROR("ldlm_cli_enqueue: %d\n", rc);
674 mdc_clear_replay_flag(req, rc);
675 ptlrpc_req_finished(req);
678 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
683 static int mdc_finish_intent_lock(struct obd_export *exp,
684 struct ptlrpc_request *request,
685 struct md_op_data *op_data,
686 struct lookup_intent *it,
687 struct lustre_handle *lockh)
689 struct lustre_handle old_lock;
690 struct mdt_body *mdt_body;
691 struct ldlm_lock *lock;
695 LASSERT(request != NULL);
696 LASSERT(request != LP_POISON);
697 LASSERT(request->rq_repmsg != LP_POISON);
699 if (!it_disposition(it, DISP_IT_EXECD)) {
700 /* The server failed before it even started executing the
701 * intent, i.e. because it couldn't unpack the request. */
702 LASSERT(it->d.lustre.it_status != 0);
703 RETURN(it->d.lustre.it_status);
705 rc = it_open_error(DISP_IT_EXECD, it);
709 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
710 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
712 /* If we were revalidating a fid/name pair, mark the intent in
713 * case we fail and get called again from lookup */
714 if (fid_is_sane(&op_data->op_fid2) &&
715 it->it_flags & O_CHECK_STALE &&
716 it->it_op != IT_GETATTR) {
717 it_set_disposition(it, DISP_ENQ_COMPLETE);
719 /* Also: did we find the same inode? */
720 /* sever can return one of two fids:
721 * op_fid2 - new allocated fid - if file is created.
722 * op_fid3 - existent fid - if file only open.
723 * op_fid3 is saved in lmv_intent_open */
724 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
725 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
726 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
727 "\n", PFID(&op_data->op_fid2),
728 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
733 rc = it_open_error(DISP_LOOKUP_EXECD, it);
737 /* keep requests around for the multiple phases of the call
738 * this shows the DISP_XX must guarantee we make it into the call
740 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
741 it_disposition(it, DISP_OPEN_CREATE) &&
742 !it_open_error(DISP_OPEN_CREATE, it)) {
743 it_set_disposition(it, DISP_ENQ_CREATE_REF);
744 ptlrpc_request_addref(request); /* balanced in ll_create_node */
746 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
747 it_disposition(it, DISP_OPEN_OPEN) &&
748 !it_open_error(DISP_OPEN_OPEN, it)) {
749 it_set_disposition(it, DISP_ENQ_OPEN_REF);
750 ptlrpc_request_addref(request); /* balanced in ll_file_open */
751 /* BUG 11546 - eviction in the middle of open rpc processing */
752 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
755 if (it->it_op & IT_CREAT) {
756 /* XXX this belongs in ll_create_it */
757 } else if (it->it_op == IT_OPEN) {
758 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
760 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
763 /* If we already have a matching lock, then cancel the new
764 * one. We have to set the data here instead of in
765 * mdc_enqueue, because we need to use the child's inode as
766 * the l_ast_data to match, and that's not available until
767 * intent_finish has performed the iget().) */
768 lock = ldlm_handle2lock(lockh);
770 ldlm_policy_data_t policy = lock->l_policy_data;
771 LDLM_DEBUG(lock, "matching against this");
773 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
774 &lock->l_resource->lr_name),
775 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
776 (unsigned long)lock->l_resource->lr_name.name[0],
777 (unsigned long)lock->l_resource->lr_name.name[1],
778 (unsigned long)lock->l_resource->lr_name.name[2],
779 (unsigned long)fid_seq(&mdt_body->fid1),
780 (unsigned long)fid_oid(&mdt_body->fid1),
781 (unsigned long)fid_ver(&mdt_body->fid1));
784 memcpy(&old_lock, lockh, sizeof(*lockh));
785 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
786 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
787 ldlm_lock_decref_and_cancel(lockh,
788 it->d.lustre.it_lock_mode);
789 memcpy(lockh, &old_lock, sizeof(old_lock));
790 it->d.lustre.it_lock_handle = lockh->cookie;
793 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
794 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
795 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
800 * This long block is all about fixing up the lock and request state
801 * so that it is correct as of the moment _before_ the operation was
802 * applied; that way, the VFS will think that everything is normal and
803 * call Lustre's regular VFS methods.
805 * If we're performing a creation, that means that unless the creation
806 * failed with EEXIST, we should fake up a negative dentry.
808 * For everything else, we want to lookup to succeed.
810 * One additional note: if CREATE or OPEN succeeded, we add an extra
811 * reference to the request because we need to keep it around until
812 * ll_create/ll_open gets called.
814 * The server will return to us, in it_disposition, an indication of
815 * exactly what d.lustre.it_status refers to.
817 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
818 * otherwise if DISP_OPEN_CREATE is set, then it status is the
819 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
820 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
823 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
826 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
827 void *lmm, int lmmsize, struct lookup_intent *it,
828 int lookup_flags, struct ptlrpc_request **reqp,
829 ldlm_blocking_callback cb_blocking,
830 int extra_lock_flags)
832 struct lustre_handle lockh;
837 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
838 ", intent: %s flags %#o\n", op_data->op_namelen,
839 op_data->op_name, PFID(&op_data->op_fid2),
840 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
844 if (fid_is_sane(&op_data->op_fid2) &&
845 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
846 /* We could just return 1 immediately, but since we should only
847 * be called in revalidate_it if we already have a lock, let's
849 ldlm_policy_data_t policy;
852 /* As not all attributes are kept under update lock, e.g.
853 owner/group/acls are under lookup lock, we need both
854 ibits for GETATTR. */
856 /* For CMD, UPDATE lock and LOOKUP lock can not be got
857 * at the same for cross-object, so we can not match
858 * the 2 lock at the same time FIXME: but how to handle
859 * the above situation */
860 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
861 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
863 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
864 &op_data->op_fid2, LDLM_IBITS, &policy,
865 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
867 it->d.lustre.it_lock_handle = lockh.cookie;
868 it->d.lustre.it_lock_mode = mode;
871 /* Only return failure if it was not GETATTR by cfid
872 (from inode_revalidate) */
873 if (mode || op_data->op_namelen != 0)
877 /* lookup_it may be called only after revalidate_it has run, because
878 * revalidate_it cannot return errors, only zero. Returning zero causes
879 * this call to lookup, which *can* return an error.
881 * We only want to execute the request associated with the intent one
882 * time, however, so don't send the request again. Instead, skip past
883 * this and use the request from revalidate. In this case, revalidate
884 * never dropped its reference, so the refcounts are all OK */
885 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
886 struct ldlm_enqueue_info einfo =
887 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
888 ldlm_completion_ast, NULL, NULL };
890 /* For case if upper layer did not alloc fid, do it now. */
891 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
892 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
894 CERROR("Can't alloc new fid, rc %d\n", rc);
898 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
899 lmm, lmmsize, extra_lock_flags);
902 it->d.lustre.it_lock_handle = lockh.cookie;
903 } else if (!fid_is_sane(&op_data->op_fid2) ||
904 !(it->it_flags & O_CHECK_STALE)) {
905 /* DISP_ENQ_COMPLETE set means there is extra reference on
906 * request referenced from this intent, saved for subsequent
907 * lookup. This path is executed when we proceed to this
908 * lookup, so we clear DISP_ENQ_COMPLETE */
909 it_clear_disposition(it, DISP_ENQ_COMPLETE);
911 *reqp = it->d.lustre.it_data;
912 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
916 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
917 void *unused, int rc)
919 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
920 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
921 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
922 struct lookup_intent *it;
923 struct lustre_handle *lockh;
924 struct obd_device *obddev;
925 int flags = LDLM_FL_HAS_INTENT;
929 lockh = &minfo->mi_lockh;
931 obddev = class_exp2obd(exp);
933 mdc_exit_request(&obddev->u.cli);
934 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
937 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
938 &flags, NULL, 0, NULL, lockh, rc);
940 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
941 mdc_clear_replay_flag(req, rc);
945 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
949 it->d.lustre.it_lock_handle = lockh->cookie;
951 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
956 minfo->mi_cb(req, minfo, rc);
960 int mdc_intent_getattr_async(struct obd_export *exp,
961 struct md_enqueue_info *minfo,
962 struct ldlm_enqueue_info *einfo)
964 struct md_op_data *op_data = &minfo->mi_data;
965 struct lookup_intent *it = &minfo->mi_it;
966 struct ptlrpc_request *req;
967 struct obd_device *obddev = class_exp2obd(exp);
968 struct ldlm_res_id res_id;
969 ldlm_policy_data_t policy = {
970 .l_inodebits = { MDS_INODELOCK_LOOKUP }
973 int flags = LDLM_FL_HAS_INTENT;
976 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
977 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
978 ldlm_it2str(it->it_op), it->it_flags);
980 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
981 req = mdc_intent_getattr_pack(exp, it, op_data);
985 mdc_enter_request(&obddev->u.cli);
986 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
987 0, NULL, &minfo->mi_lockh, 1);
989 mdc_exit_request(&obddev->u.cli);
993 req->rq_async_args.pointer_arg[0] = exp;
994 req->rq_async_args.pointer_arg[1] = minfo;
995 req->rq_async_args.pointer_arg[2] = einfo;
996 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
997 ptlrpcd_add_req(req);
1002 int mdc_revalidate_lock(struct obd_export *exp,
1003 struct lookup_intent *it,
1006 /* We could just return 1 immediately, but since we should only
1007 * be called in revalidate_it if we already have a lock, let's
1009 struct ldlm_res_id res_id;
1010 struct lustre_handle lockh;
1011 ldlm_policy_data_t policy;
1015 fid_build_reg_res_name(fid, &res_id);
1016 /* As not all attributes are kept under update lock, e.g.
1017 owner/group/acls are under lookup lock, we need both
1018 ibits for GETATTR. */
1019 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1020 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1021 MDS_INODELOCK_LOOKUP;
1023 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1024 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1025 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
1027 it->d.lustre.it_lock_handle = lockh.cookie;
1028 it->d.lustre.it_lock_mode = mode;