1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
116 struct ldlm_lock *lock;
124 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
126 LASSERT(lock != NULL);
127 lock_res_and_lock(lock);
129 if (lock->l_ast_data && lock->l_ast_data != data) {
130 struct inode *new_inode = data;
131 struct inode *old_inode = lock->l_ast_data;
132 LASSERTF(old_inode->i_state & I_FREEING,
133 "Found existing inode %p/%lu/%u state %lu in lock: "
134 "setting data to %p/%lu/%u\n", old_inode,
135 old_inode->i_ino, old_inode->i_generation,
137 new_inode, new_inode->i_ino, new_inode->i_generation);
140 lock->l_ast_data = data;
141 unlock_res_and_lock(lock);
147 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
148 const struct lu_fid *fid, ldlm_type_t type,
149 ldlm_policy_data_t *policy, ldlm_mode_t mode,
150 struct lustre_handle *lockh)
152 struct ldlm_res_id res_id;
156 fid_build_reg_res_name(fid, &res_id);
157 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
158 &res_id, type, policy, mode, lockh);
162 int mdc_cancel_unused(struct obd_export *exp,
163 const struct lu_fid *fid,
164 ldlm_policy_data_t *policy,
165 ldlm_mode_t mode, int flags, void *opaque)
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_change_cbdata(struct obd_export *exp,
180 const struct lu_fid *fid,
181 ldlm_iterator_t it, void *data)
183 struct ldlm_res_id res_id;
186 fid_build_reg_res_name(fid, &res_id);
187 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
194 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
196 /* Don't hold error requests for replay. */
197 if (req->rq_replay) {
198 spin_lock(&req->rq_lock);
200 spin_unlock(&req->rq_lock);
202 if (rc && req->rq_transno != 0) {
203 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
208 /* Save a large LOV EA into the request buffer so that it is available
209 * for replay. We don't do this in the initial request because the
210 * original request doesn't need this buffer (at most it sends just the
211 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212 * buffer and may also be difficult to allocate and save a very large
213 * request buffer for each open. (bug 5707)
215 * OOM here may cause recovery failure if lmm is needed (only for the
216 * original open if the MDS crashed just when this client also OOM'd)
217 * but this is incredibly unlikely, and questionable whether the client
218 * could do MDS recovery under OOM anyways... */
219 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
220 struct mdt_body *body)
224 /* FIXME: remove this explicit offset. */
225 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228 CERROR("Can't enlarge segment %d size to %d\n",
229 DLM_INTENT_REC_OFF + 4, body->eadatasize);
230 body->valid &= ~OBD_MD_FLEASIZE;
231 body->eadatasize = 0;
235 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
236 struct lookup_intent *it,
237 struct md_op_data *op_data,
238 void *lmm, int lmmsize,
241 struct ptlrpc_request *req;
242 struct obd_device *obddev = class_exp2obd(exp);
243 struct ldlm_intent *lit;
244 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
246 CFS_LIST_HEAD(cancels);
252 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
254 /* XXX: openlock is not cancelled for cross-refs. */
255 /* If inode is known, cancel conflicting OPEN locks. */
256 if (fid_is_sane(&op_data->op_fid2)) {
257 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
260 else if (it->it_flags & FMODE_EXEC)
265 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
270 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
271 if (it->it_op & IT_CREAT || joinfile)
275 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
277 MDS_INODELOCK_UPDATE);
279 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
280 &RQF_LDLM_INTENT_OPEN);
282 ldlm_lock_list_put(&cancels, l_bl_ast, count);
283 RETURN(ERR_PTR(-ENOMEM));
286 /* parent capability */
287 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
288 /* child capability, reserve the size according to parent capa, it will
289 * be filled after we get the reply */
290 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
292 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
293 op_data->op_namelen + 1);
294 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
295 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
297 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
301 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
303 ptlrpc_request_free(req);
308 __u64 head_size = *(__u64 *)op_data->op_data;
309 mdc_join_pack(req, op_data, head_size);
312 spin_lock(&req->rq_lock);
314 spin_unlock(&req->rq_lock);
316 /* pack the intent */
317 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
318 lit->opc = (__u64)it->it_op;
320 /* pack the intended request */
321 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
324 /* for remote client, fetch remote perm for current user */
325 if (client_is_remote(exp))
326 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
327 sizeof(struct mdt_remote_perm));
328 ptlrpc_request_set_replen(req);
332 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
333 struct lookup_intent *it,
334 struct md_op_data *op_data)
336 struct ptlrpc_request *req;
337 struct obd_device *obddev = class_exp2obd(exp);
338 struct ldlm_intent *lit;
342 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
343 &RQF_LDLM_INTENT_UNLINK);
345 RETURN(ERR_PTR(-ENOMEM));
347 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
348 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
349 op_data->op_namelen + 1);
351 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
353 ptlrpc_request_free(req);
357 /* pack the intent */
358 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
359 lit->opc = (__u64)it->it_op;
361 /* pack the intended request */
362 mdc_unlink_pack(req, op_data);
364 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
365 obddev->u.cli.cl_max_mds_easize);
366 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
367 obddev->u.cli.cl_max_mds_cookiesize);
368 ptlrpc_request_set_replen(req);
372 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
373 struct lookup_intent *it,
374 struct md_op_data *op_data)
376 struct ptlrpc_request *req;
377 struct obd_device *obddev = class_exp2obd(exp);
378 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
379 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
380 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
381 (client_is_remote(exp) ?
382 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
383 struct ldlm_intent *lit;
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_GETATTR);
390 RETURN(ERR_PTR(-ENOMEM));
392 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
393 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
394 op_data->op_namelen + 1);
396 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
398 ptlrpc_request_free(req);
402 /* pack the intent */
403 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
404 lit->opc = (__u64)it->it_op;
406 /* pack the intended request */
407 mdc_getattr_pack(req, valid, it->it_flags, op_data);
409 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
410 obddev->u.cli.cl_max_mds_easize);
411 if (client_is_remote(exp))
412 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
413 sizeof(struct mdt_remote_perm));
414 ptlrpc_request_set_replen(req);
418 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
420 struct ptlrpc_request *req;
424 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
426 RETURN(ERR_PTR(-ENOMEM));
428 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430 ptlrpc_request_free(req);
434 ptlrpc_request_set_replen(req);
438 static int mdc_finish_enqueue(struct obd_export *exp,
439 struct ptlrpc_request *req,
440 struct ldlm_enqueue_info *einfo,
441 struct lookup_intent *it,
442 struct lustre_handle *lockh,
445 struct req_capsule *pill = &req->rq_pill;
446 struct ldlm_request *lockreq;
447 struct ldlm_reply *lockrep;
451 /* Similarly, if we're going to replay this request, we don't want to
452 * actually get a lock, just perform the intent. */
453 if (req->rq_transno || req->rq_replay) {
454 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
455 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
458 if (rc == ELDLM_LOCK_ABORTED) {
460 memset(lockh, 0, sizeof(*lockh));
462 } else { /* rc = 0 */
463 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
466 /* If the server gave us back a different lock mode, we should
467 * fix up our variables. */
468 if (lock->l_req_mode != einfo->ei_mode) {
469 ldlm_lock_addref(lockh, lock->l_req_mode);
470 ldlm_lock_decref(lockh, einfo->ei_mode);
471 einfo->ei_mode = lock->l_req_mode;
476 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
477 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
479 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
480 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
481 it->d.lustre.it_lock_mode = einfo->ei_mode;
482 it->d.lustre.it_data = req;
484 if (it->d.lustre.it_status < 0 && req->rq_replay)
485 mdc_clear_replay_flag(req, it->d.lustre.it_status);
487 /* If we're doing an IT_OPEN which did not result in an actual
488 * successful open, then we need to remove the bit which saves
489 * this request for unconditional replay.
491 * It's important that we do this first! Otherwise we might exit the
492 * function without doing so, and try to replay a failed create
494 if (it->it_op & IT_OPEN && req->rq_replay &&
495 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
496 mdc_clear_replay_flag(req, it->d.lustre.it_status);
498 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
499 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
501 /* We know what to expect, so we do any byte flipping required here */
502 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
503 struct mdt_body *body;
505 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
507 CERROR ("Can't swab mdt_body\n");
511 if (it_disposition(it, DISP_OPEN_OPEN) &&
512 !it_open_error(DISP_OPEN_OPEN, it)) {
514 * If this is a successful OPEN request, we need to set
515 * replay handler and data early, so that if replay
516 * happens immediately after swabbing below, new reply
517 * is swabbed by that handler correctly.
519 mdc_set_open_replay_data(NULL, NULL, req);
522 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
526 * The eadata is opaque; just check that it is there.
527 * Eventually, obd_unpackmd() will check the contents.
529 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
534 if (body->valid & OBD_MD_FLMODEASIZE) {
535 struct obd_device *obddev = class_exp2obd(exp);
537 if (obddev->u.cli.cl_max_mds_easize <
539 obddev->u.cli.cl_max_mds_easize =
541 CDEBUG(D_INFO, "maxeasize become %d\n",
544 if (obddev->u.cli.cl_max_mds_cookiesize <
545 body->max_cookiesize) {
546 obddev->u.cli.cl_max_mds_cookiesize =
547 body->max_cookiesize;
548 CDEBUG(D_INFO, "cookiesize become %d\n",
549 body->max_cookiesize);
554 * We save the reply LOV EA in case we have to replay a
555 * create for recovery. If we didn't allocate a large
556 * enough request buffer above we need to reallocate it
557 * here to hold the actual LOV EA.
559 * To not save LOV EA if request is not going to replay
560 * (for example error one).
562 if ((it->it_op & IT_OPEN) && req->rq_replay) {
564 if (req_capsule_get_size(pill, &RMF_EADATA,
567 mdc_realloc_openmsg(req, body);
568 req_capsule_set_size(pill, &RMF_EADATA,
572 lmm = req_capsule_client_get(pill, &RMF_EADATA);
574 memcpy(lmm, eadata, body->eadatasize);
578 if (body->valid & OBD_MD_FLRMTPERM) {
579 struct mdt_remote_perm *perm;
581 LASSERT(client_is_remote(exp));
582 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
583 lustre_swab_mdt_remote_perm);
587 if (body->valid & OBD_MD_FLMDSCAPA) {
588 struct lustre_capa *capa, *p;
590 capa = req_capsule_server_get(pill, &RMF_CAPA1);
594 if (it->it_op & IT_OPEN) {
595 /* client fid capa will be checked in replay */
596 p = req_capsule_client_get(pill, &RMF_CAPA2);
601 if (body->valid & OBD_MD_FLOSSCAPA) {
602 struct lustre_capa *capa;
604 capa = req_capsule_server_get(pill, &RMF_CAPA2);
613 /* We always reserve enough space in the reply packet for a stripe MD, because
614 * we don't know in advance the file type. */
615 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
616 struct lookup_intent *it, struct md_op_data *op_data,
617 struct lustre_handle *lockh, void *lmm, int lmmsize,
618 struct ptlrpc_request **reqp, int extra_lock_flags)
620 struct obd_device *obddev = class_exp2obd(exp);
621 struct ptlrpc_request *req = NULL;
622 struct req_capsule *pill;
623 int flags = extra_lock_flags;
625 struct ldlm_res_id res_id;
626 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
629 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
632 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
635 flags |= LDLM_FL_HAS_INTENT;
636 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
637 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
643 /* The only way right now is FLOCK, in this case we hide flock
644 policy as lmm, but lmmsize is 0 */
645 LASSERT(lmm && lmmsize == 0);
646 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
648 policy = *(ldlm_policy_data_t *)lmm;
649 res_id.name[3] = LDLM_FLOCK;
650 } else if (it->it_op & IT_OPEN) {
651 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
654 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
657 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
658 einfo->ei_cbdata = NULL;
661 it->it_flags &= ~O_JOIN_FILE;
662 } else if (it->it_op & IT_UNLINK)
663 req = mdc_intent_unlink_pack(exp, it, op_data);
664 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
665 req = mdc_intent_getattr_pack(exp, it, op_data);
666 else if (it->it_op == IT_READDIR)
667 req = ldlm_enqueue_pack(exp);
674 RETURN(PTR_ERR(req));
675 pill = &req->rq_pill;
677 /* It is important to obtain rpc_lock first (if applicable), so that
678 * threads that are serialised with rpc_lock are not polluting our
679 * rpcs in flight counter. We do not do flock request limiting, though*/
681 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
682 mdc_enter_request(&obddev->u.cli);
684 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
690 mdc_exit_request(&obddev->u.cli);
691 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
694 /* For flock requests we immediatelly return without further
695 delay and let caller deal with the rest, since rest of
696 this function metadata processing makes no sense for flock
702 CERROR("ldlm_cli_enqueue: %d\n", rc);
703 mdc_clear_replay_flag(req, rc);
704 ptlrpc_req_finished(req);
707 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
712 static int mdc_finish_intent_lock(struct obd_export *exp,
713 struct ptlrpc_request *request,
714 struct md_op_data *op_data,
715 struct lookup_intent *it,
716 struct lustre_handle *lockh)
718 struct lustre_handle old_lock;
719 struct mdt_body *mdt_body;
720 struct ldlm_lock *lock;
724 LASSERT(request != NULL);
725 LASSERT(request != LP_POISON);
726 LASSERT(request->rq_repmsg != LP_POISON);
728 if (!it_disposition(it, DISP_IT_EXECD)) {
729 /* The server failed before it even started executing the
730 * intent, i.e. because it couldn't unpack the request. */
731 LASSERT(it->d.lustre.it_status != 0);
732 RETURN(it->d.lustre.it_status);
734 rc = it_open_error(DISP_IT_EXECD, it);
738 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
739 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
741 /* If we were revalidating a fid/name pair, mark the intent in
742 * case we fail and get called again from lookup */
743 if (fid_is_sane(&op_data->op_fid2) &&
744 it->it_flags & O_CHECK_STALE &&
745 it->it_op != IT_GETATTR) {
746 it_set_disposition(it, DISP_ENQ_COMPLETE);
748 /* Also: did we find the same inode? */
749 /* sever can return one of two fids:
750 * op_fid2 - new allocated fid - if file is created.
751 * op_fid3 - existent fid - if file only open.
752 * op_fid3 is saved in lmv_intent_open */
753 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
754 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
755 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
756 "\n", PFID(&op_data->op_fid2),
757 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
762 rc = it_open_error(DISP_LOOKUP_EXECD, it);
766 /* keep requests around for the multiple phases of the call
767 * this shows the DISP_XX must guarantee we make it into the call
769 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
770 it_disposition(it, DISP_OPEN_CREATE) &&
771 !it_open_error(DISP_OPEN_CREATE, it)) {
772 it_set_disposition(it, DISP_ENQ_CREATE_REF);
773 ptlrpc_request_addref(request); /* balanced in ll_create_node */
775 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
776 it_disposition(it, DISP_OPEN_OPEN) &&
777 !it_open_error(DISP_OPEN_OPEN, it)) {
778 it_set_disposition(it, DISP_ENQ_OPEN_REF);
779 ptlrpc_request_addref(request); /* balanced in ll_file_open */
780 /* BUG 11546 - eviction in the middle of open rpc processing */
781 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
784 if (it->it_op & IT_CREAT) {
785 /* XXX this belongs in ll_create_it */
786 } else if (it->it_op == IT_OPEN) {
787 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
789 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
792 /* If we already have a matching lock, then cancel the new
793 * one. We have to set the data here instead of in
794 * mdc_enqueue, because we need to use the child's inode as
795 * the l_ast_data to match, and that's not available until
796 * intent_finish has performed the iget().) */
797 lock = ldlm_handle2lock(lockh);
799 ldlm_policy_data_t policy = lock->l_policy_data;
800 LDLM_DEBUG(lock, "matching against this");
802 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
803 &lock->l_resource->lr_name),
804 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
805 (unsigned long)lock->l_resource->lr_name.name[0],
806 (unsigned long)lock->l_resource->lr_name.name[1],
807 (unsigned long)lock->l_resource->lr_name.name[2],
808 (unsigned long)fid_seq(&mdt_body->fid1),
809 (unsigned long)fid_oid(&mdt_body->fid1),
810 (unsigned long)fid_ver(&mdt_body->fid1));
813 memcpy(&old_lock, lockh, sizeof(*lockh));
814 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
815 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
816 ldlm_lock_decref_and_cancel(lockh,
817 it->d.lustre.it_lock_mode);
818 memcpy(lockh, &old_lock, sizeof(old_lock));
819 it->d.lustre.it_lock_handle = lockh->cookie;
822 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
823 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
824 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
829 * This long block is all about fixing up the lock and request state
830 * so that it is correct as of the moment _before_ the operation was
831 * applied; that way, the VFS will think that everything is normal and
832 * call Lustre's regular VFS methods.
834 * If we're performing a creation, that means that unless the creation
835 * failed with EEXIST, we should fake up a negative dentry.
837 * For everything else, we want to lookup to succeed.
839 * One additional note: if CREATE or OPEN succeeded, we add an extra
840 * reference to the request because we need to keep it around until
841 * ll_create/ll_open gets called.
843 * The server will return to us, in it_disposition, an indication of
844 * exactly what d.lustre.it_status refers to.
846 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
847 * otherwise if DISP_OPEN_CREATE is set, then it status is the
848 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
849 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
852 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
855 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
856 void *lmm, int lmmsize, struct lookup_intent *it,
857 int lookup_flags, struct ptlrpc_request **reqp,
858 ldlm_blocking_callback cb_blocking,
859 int extra_lock_flags)
861 struct lustre_handle lockh;
866 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
867 ", intent: %s flags %#o\n", op_data->op_namelen,
868 op_data->op_name, PFID(&op_data->op_fid2),
869 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
873 if (fid_is_sane(&op_data->op_fid2) &&
874 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
875 /* We could just return 1 immediately, but since we should only
876 * be called in revalidate_it if we already have a lock, let's
878 ldlm_policy_data_t policy;
881 /* As not all attributes are kept under update lock, e.g.
882 owner/group/acls are under lookup lock, we need both
883 ibits for GETATTR. */
885 /* For CMD, UPDATE lock and LOOKUP lock can not be got
886 * at the same for cross-object, so we can not match
887 * the 2 lock at the same time FIXME: but how to handle
888 * the above situation */
889 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
890 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
892 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
893 &op_data->op_fid2, LDLM_IBITS, &policy,
894 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
896 it->d.lustre.it_lock_handle = lockh.cookie;
897 it->d.lustre.it_lock_mode = mode;
900 /* Only return failure if it was not GETATTR by cfid
901 (from inode_revalidate) */
902 if (mode || op_data->op_namelen != 0)
906 /* lookup_it may be called only after revalidate_it has run, because
907 * revalidate_it cannot return errors, only zero. Returning zero causes
908 * this call to lookup, which *can* return an error.
910 * We only want to execute the request associated with the intent one
911 * time, however, so don't send the request again. Instead, skip past
912 * this and use the request from revalidate. In this case, revalidate
913 * never dropped its reference, so the refcounts are all OK */
914 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
915 struct ldlm_enqueue_info einfo =
916 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
917 ldlm_completion_ast, NULL, NULL };
919 /* For case if upper layer did not alloc fid, do it now. */
920 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
921 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
923 CERROR("Can't alloc new fid, rc %d\n", rc);
927 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
928 lmm, lmmsize, NULL, extra_lock_flags);
931 it->d.lustre.it_lock_handle = lockh.cookie;
932 } else if (!fid_is_sane(&op_data->op_fid2) ||
933 !(it->it_flags & O_CHECK_STALE)) {
934 /* DISP_ENQ_COMPLETE set means there is extra reference on
935 * request referenced from this intent, saved for subsequent
936 * lookup. This path is executed when we proceed to this
937 * lookup, so we clear DISP_ENQ_COMPLETE */
938 it_clear_disposition(it, DISP_ENQ_COMPLETE);
940 *reqp = it->d.lustre.it_data;
941 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
945 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
946 void *unused, int rc)
948 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
949 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
950 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
951 struct lookup_intent *it;
952 struct lustre_handle *lockh;
953 struct obd_device *obddev;
954 int flags = LDLM_FL_HAS_INTENT;
958 lockh = &minfo->mi_lockh;
960 obddev = class_exp2obd(exp);
962 mdc_exit_request(&obddev->u.cli);
963 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
966 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
967 &flags, NULL, 0, NULL, lockh, rc);
969 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
970 mdc_clear_replay_flag(req, rc);
974 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
978 it->d.lustre.it_lock_handle = lockh->cookie;
980 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
985 minfo->mi_cb(req, minfo, rc);
989 int mdc_intent_getattr_async(struct obd_export *exp,
990 struct md_enqueue_info *minfo,
991 struct ldlm_enqueue_info *einfo)
993 struct md_op_data *op_data = &minfo->mi_data;
994 struct lookup_intent *it = &minfo->mi_it;
995 struct ptlrpc_request *req;
996 struct obd_device *obddev = class_exp2obd(exp);
997 struct ldlm_res_id res_id;
998 ldlm_policy_data_t policy = {
999 .l_inodebits = { MDS_INODELOCK_LOOKUP }
1002 int flags = LDLM_FL_HAS_INTENT;
1005 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1006 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1007 ldlm_it2str(it->it_op), it->it_flags);
1009 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1010 req = mdc_intent_getattr_pack(exp, it, op_data);
1014 mdc_enter_request(&obddev->u.cli);
1015 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1016 0, NULL, &minfo->mi_lockh, 1);
1018 mdc_exit_request(&obddev->u.cli);
1022 req->rq_async_args.pointer_arg[0] = exp;
1023 req->rq_async_args.pointer_arg[1] = minfo;
1024 req->rq_async_args.pointer_arg[2] = einfo;
1025 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1026 ptlrpcd_add_req(req);
1031 int mdc_revalidate_lock(struct obd_export *exp,
1032 struct lookup_intent *it,
1035 /* We could just return 1 immediately, but since we should only
1036 * be called in revalidate_it if we already have a lock, let's
1038 struct ldlm_res_id res_id;
1039 struct lustre_handle lockh;
1040 ldlm_policy_data_t policy;
1044 fid_build_reg_res_name(fid, &res_id);
1045 /* As not all attributes are kept under update lock, e.g.
1046 owner/group/acls are under lookup lock, we need both
1047 ibits for GETATTR. */
1048 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1049 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1050 MDS_INODELOCK_LOOKUP;
1052 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1053 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1054 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
1056 it->d.lustre.it_lock_handle = lockh.cookie;
1057 it->d.lustre.it_lock_mode = mode;