1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
116 struct ldlm_lock *lock;
124 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
126 LASSERT(lock != NULL);
127 lock_res_and_lock(lock);
129 if (lock->l_ast_data && lock->l_ast_data != data) {
130 struct inode *new_inode = data;
131 struct inode *old_inode = lock->l_ast_data;
132 LASSERTF(old_inode->i_state & I_FREEING,
133 "Found existing inode %p/%lu/%u state %lu in lock: "
134 "setting data to %p/%lu/%u\n", old_inode,
135 old_inode->i_ino, old_inode->i_generation,
137 new_inode, new_inode->i_ino, new_inode->i_generation);
140 lock->l_ast_data = data;
141 unlock_res_and_lock(lock);
147 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
148 const struct lu_fid *fid, ldlm_type_t type,
149 ldlm_policy_data_t *policy, ldlm_mode_t mode,
150 struct lustre_handle *lockh)
152 struct ldlm_res_id res_id;
156 fid_build_reg_res_name(fid, &res_id);
157 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
158 &res_id, type, policy, mode, lockh, 0);
162 int mdc_cancel_unused(struct obd_export *exp,
163 const struct lu_fid *fid,
164 ldlm_policy_data_t *policy,
165 ldlm_mode_t mode, int flags, void *opaque)
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_change_cbdata(struct obd_export *exp,
180 const struct lu_fid *fid,
181 ldlm_iterator_t it, void *data)
183 struct ldlm_res_id res_id;
186 fid_build_reg_res_name(fid, &res_id);
187 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
194 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
196 /* Don't hold error requests for replay. */
197 if (req->rq_replay) {
198 spin_lock(&req->rq_lock);
200 spin_unlock(&req->rq_lock);
202 if (rc && req->rq_transno != 0) {
203 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
208 /* Save a large LOV EA into the request buffer so that it is available
209 * for replay. We don't do this in the initial request because the
210 * original request doesn't need this buffer (at most it sends just the
211 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212 * buffer and may also be difficult to allocate and save a very large
213 * request buffer for each open. (bug 5707)
215 * OOM here may cause recovery failure if lmm is needed (only for the
216 * original open if the MDS crashed just when this client also OOM'd)
217 * but this is incredibly unlikely, and questionable whether the client
218 * could do MDS recovery under OOM anyways... */
219 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
220 struct mdt_body *body)
224 /* FIXME: remove this explicit offset. */
225 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228 CERROR("Can't enlarge segment %d size to %d\n",
229 DLM_INTENT_REC_OFF + 4, body->eadatasize);
230 body->valid &= ~OBD_MD_FLEASIZE;
231 body->eadatasize = 0;
235 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
236 struct lookup_intent *it,
237 struct md_op_data *op_data,
238 void *lmm, int lmmsize,
241 struct ptlrpc_request *req;
242 struct obd_device *obddev = class_exp2obd(exp);
243 struct ldlm_intent *lit;
244 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
246 CFS_LIST_HEAD(cancels);
252 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
254 /* XXX: openlock is not cancelled for cross-refs. */
255 /* If inode is known, cancel conflicting OPEN locks. */
256 if (fid_is_sane(&op_data->op_fid2)) {
257 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
260 else if (it->it_flags & FMODE_EXEC)
265 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
270 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
271 if (it->it_op & IT_CREAT || joinfile)
275 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
277 MDS_INODELOCK_UPDATE);
279 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
280 &RQF_LDLM_INTENT_OPEN);
282 ldlm_lock_list_put(&cancels, l_bl_ast, count);
283 RETURN(ERR_PTR(-ENOMEM));
286 /* parent capability */
287 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
288 /* child capability, reserve the size according to parent capa, it will
289 * be filled after we get the reply */
290 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
292 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
293 op_data->op_namelen + 1);
294 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
295 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
297 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
301 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
303 ptlrpc_request_free(req);
308 __u64 head_size = *(__u64 *)op_data->op_data;
309 mdc_join_pack(req, op_data, head_size);
312 spin_lock(&req->rq_lock);
314 spin_unlock(&req->rq_lock);
316 /* pack the intent */
317 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
318 lit->opc = (__u64)it->it_op;
320 /* pack the intended request */
321 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
324 /* for remote client, fetch remote perm for current user */
325 if (client_is_remote(exp))
326 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
327 sizeof(struct mdt_remote_perm));
328 ptlrpc_request_set_replen(req);
332 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
333 struct lookup_intent *it,
334 struct md_op_data *op_data)
336 struct ptlrpc_request *req;
337 struct obd_device *obddev = class_exp2obd(exp);
338 struct ldlm_intent *lit;
342 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
343 &RQF_LDLM_INTENT_UNLINK);
345 RETURN(ERR_PTR(-ENOMEM));
347 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
348 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
349 op_data->op_namelen + 1);
351 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
353 ptlrpc_request_free(req);
357 /* pack the intent */
358 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
359 lit->opc = (__u64)it->it_op;
361 /* pack the intended request */
362 mdc_unlink_pack(req, op_data);
364 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
365 obddev->u.cli.cl_max_mds_easize);
366 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
367 obddev->u.cli.cl_max_mds_cookiesize);
368 ptlrpc_request_set_replen(req);
372 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
373 struct lookup_intent *it,
374 struct md_op_data *op_data)
376 struct ptlrpc_request *req;
377 struct obd_device *obddev = class_exp2obd(exp);
378 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
379 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
380 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
381 (client_is_remote(exp) ?
382 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
383 struct ldlm_intent *lit;
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_GETATTR);
390 RETURN(ERR_PTR(-ENOMEM));
392 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
393 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
394 op_data->op_namelen + 1);
396 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
398 ptlrpc_request_free(req);
402 /* pack the intent */
403 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
404 lit->opc = (__u64)it->it_op;
406 /* pack the intended request */
407 mdc_getattr_pack(req, valid, it->it_flags, op_data);
409 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
410 obddev->u.cli.cl_max_mds_easize);
411 if (client_is_remote(exp))
412 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
413 sizeof(struct mdt_remote_perm));
414 ptlrpc_request_set_replen(req);
418 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
420 struct ptlrpc_request *req;
424 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
426 RETURN(ERR_PTR(-ENOMEM));
428 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430 ptlrpc_request_free(req);
434 ptlrpc_request_set_replen(req);
438 static int mdc_finish_enqueue(struct obd_export *exp,
439 struct ptlrpc_request *req,
440 struct ldlm_enqueue_info *einfo,
441 struct lookup_intent *it,
442 struct lustre_handle *lockh,
445 struct req_capsule *pill = &req->rq_pill;
446 struct ldlm_request *lockreq;
447 struct ldlm_reply *lockrep;
451 /* Similarly, if we're going to replay this request, we don't want to
452 * actually get a lock, just perform the intent. */
453 if (req->rq_transno || req->rq_replay) {
454 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
455 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
458 if (rc == ELDLM_LOCK_ABORTED) {
460 memset(lockh, 0, sizeof(*lockh));
462 } else { /* rc = 0 */
463 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
466 /* If the server gave us back a different lock mode, we should
467 * fix up our variables. */
468 if (lock->l_req_mode != einfo->ei_mode) {
469 ldlm_lock_addref(lockh, lock->l_req_mode);
470 ldlm_lock_decref(lockh, einfo->ei_mode);
471 einfo->ei_mode = lock->l_req_mode;
476 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
477 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
479 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
480 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
481 it->d.lustre.it_lock_mode = einfo->ei_mode;
482 it->d.lustre.it_data = req;
484 if (it->d.lustre.it_status < 0 && req->rq_replay)
485 mdc_clear_replay_flag(req, it->d.lustre.it_status);
487 /* If we're doing an IT_OPEN which did not result in an actual
488 * successful open, then we need to remove the bit which saves
489 * this request for unconditional replay.
491 * It's important that we do this first! Otherwise we might exit the
492 * function without doing so, and try to replay a failed create
494 if (it->it_op & IT_OPEN && req->rq_replay &&
495 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
496 mdc_clear_replay_flag(req, it->d.lustre.it_status);
498 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
499 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
501 /* We know what to expect, so we do any byte flipping required here */
502 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
503 struct mdt_body *body;
505 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
507 CERROR ("Can't swab mdt_body\n");
511 if (it_disposition(it, DISP_OPEN_OPEN) &&
512 !it_open_error(DISP_OPEN_OPEN, it)) {
514 * If this is a successful OPEN request, we need to set
515 * replay handler and data early, so that if replay
516 * happens immediately after swabbing below, new reply
517 * is swabbed by that handler correctly.
519 mdc_set_open_replay_data(NULL, NULL, req);
522 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
526 * The eadata is opaque; just check that it is there.
527 * Eventually, obd_unpackmd() will check the contents.
529 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
535 * We save the reply LOV EA in case we have to replay a
536 * create for recovery. If we didn't allocate a large
537 * enough request buffer above we need to reallocate it
538 * here to hold the actual LOV EA.
540 * To not save LOV EA if request is not going to replay
541 * (for example error one).
543 if ((it->it_op & IT_OPEN) && req->rq_replay) {
545 if (req_capsule_get_size(pill, &RMF_EADATA,
548 mdc_realloc_openmsg(req, body);
549 req_capsule_set_size(pill, &RMF_EADATA,
553 lmm = req_capsule_client_get(pill, &RMF_EADATA);
555 memcpy(lmm, eadata, body->eadatasize);
559 if (body->valid & OBD_MD_FLRMTPERM) {
560 struct mdt_remote_perm *perm;
562 LASSERT(client_is_remote(exp));
563 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
564 lustre_swab_mdt_remote_perm);
568 if (body->valid & OBD_MD_FLMDSCAPA) {
569 struct lustre_capa *capa, *p;
571 capa = req_capsule_server_get(pill, &RMF_CAPA1);
575 if (it->it_op & IT_OPEN) {
576 /* client fid capa will be checked in replay */
577 p = req_capsule_client_get(pill, &RMF_CAPA2);
582 if (body->valid & OBD_MD_FLOSSCAPA) {
583 struct lustre_capa *capa;
585 capa = req_capsule_server_get(pill, &RMF_CAPA2);
594 /* We always reserve enough space in the reply packet for a stripe MD, because
595 * we don't know in advance the file type. */
596 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
597 struct lookup_intent *it, struct md_op_data *op_data,
598 struct lustre_handle *lockh, void *lmm, int lmmsize,
599 struct ptlrpc_request **reqp, int extra_lock_flags)
601 struct obd_device *obddev = class_exp2obd(exp);
602 struct ptlrpc_request *req = NULL;
603 struct req_capsule *pill;
604 int flags = extra_lock_flags;
606 struct ldlm_res_id res_id;
607 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
610 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
613 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
616 flags |= LDLM_FL_HAS_INTENT;
617 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
618 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
624 /* The only way right now is FLOCK, in this case we hide flock
625 policy as lmm, but lmmsize is 0 */
626 LASSERT(lmm && lmmsize == 0);
627 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
629 policy = *(ldlm_policy_data_t *)lmm;
630 res_id.name[3] = LDLM_FLOCK;
631 } else if (it->it_op & IT_OPEN) {
632 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
635 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
638 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
639 einfo->ei_cbdata = NULL;
642 it->it_flags &= ~O_JOIN_FILE;
643 } else if (it->it_op & IT_UNLINK)
644 req = mdc_intent_unlink_pack(exp, it, op_data);
645 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
646 req = mdc_intent_getattr_pack(exp, it, op_data);
647 else if (it->it_op == IT_READDIR)
648 req = ldlm_enqueue_pack(exp);
655 RETURN(PTR_ERR(req));
656 pill = &req->rq_pill;
658 /* It is important to obtain rpc_lock first (if applicable), so that
659 * threads that are serialised with rpc_lock are not polluting our
660 * rpcs in flight counter. We do not do flock request limiting, though*/
662 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
663 mdc_enter_request(&obddev->u.cli);
665 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
671 mdc_exit_request(&obddev->u.cli);
672 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
675 /* For flock requests we immediatelly return without further
676 delay and let caller deal with the rest, since rest of
677 this function metadata processing makes no sense for flock
683 CERROR("ldlm_cli_enqueue: %d\n", rc);
684 mdc_clear_replay_flag(req, rc);
685 ptlrpc_req_finished(req);
688 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
693 static int mdc_finish_intent_lock(struct obd_export *exp,
694 struct ptlrpc_request *request,
695 struct md_op_data *op_data,
696 struct lookup_intent *it,
697 struct lustre_handle *lockh)
699 struct lustre_handle old_lock;
700 struct mdt_body *mdt_body;
701 struct ldlm_lock *lock;
705 LASSERT(request != NULL);
706 LASSERT(request != LP_POISON);
707 LASSERT(request->rq_repmsg != LP_POISON);
709 if (!it_disposition(it, DISP_IT_EXECD)) {
710 /* The server failed before it even started executing the
711 * intent, i.e. because it couldn't unpack the request. */
712 LASSERT(it->d.lustre.it_status != 0);
713 RETURN(it->d.lustre.it_status);
715 rc = it_open_error(DISP_IT_EXECD, it);
719 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
720 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
722 /* If we were revalidating a fid/name pair, mark the intent in
723 * case we fail and get called again from lookup */
724 if (fid_is_sane(&op_data->op_fid2) &&
725 it->it_flags & O_CHECK_STALE &&
726 it->it_op != IT_GETATTR) {
727 it_set_disposition(it, DISP_ENQ_COMPLETE);
729 /* Also: did we find the same inode? */
730 /* sever can return one of two fids:
731 * op_fid2 - new allocated fid - if file is created.
732 * op_fid3 - existent fid - if file only open.
733 * op_fid3 is saved in lmv_intent_open */
734 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
735 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
736 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
737 "\n", PFID(&op_data->op_fid2),
738 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
743 rc = it_open_error(DISP_LOOKUP_EXECD, it);
747 /* keep requests around for the multiple phases of the call
748 * this shows the DISP_XX must guarantee we make it into the call
750 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
751 it_disposition(it, DISP_OPEN_CREATE) &&
752 !it_open_error(DISP_OPEN_CREATE, it)) {
753 it_set_disposition(it, DISP_ENQ_CREATE_REF);
754 ptlrpc_request_addref(request); /* balanced in ll_create_node */
756 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
757 it_disposition(it, DISP_OPEN_OPEN) &&
758 !it_open_error(DISP_OPEN_OPEN, it)) {
759 it_set_disposition(it, DISP_ENQ_OPEN_REF);
760 ptlrpc_request_addref(request); /* balanced in ll_file_open */
761 /* BUG 11546 - eviction in the middle of open rpc processing */
762 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
765 if (it->it_op & IT_CREAT) {
766 /* XXX this belongs in ll_create_it */
767 } else if (it->it_op == IT_OPEN) {
768 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
770 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
773 /* If we already have a matching lock, then cancel the new
774 * one. We have to set the data here instead of in
775 * mdc_enqueue, because we need to use the child's inode as
776 * the l_ast_data to match, and that's not available until
777 * intent_finish has performed the iget().) */
778 lock = ldlm_handle2lock(lockh);
780 ldlm_policy_data_t policy = lock->l_policy_data;
781 LDLM_DEBUG(lock, "matching against this");
783 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
784 &lock->l_resource->lr_name),
785 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
786 (unsigned long)lock->l_resource->lr_name.name[0],
787 (unsigned long)lock->l_resource->lr_name.name[1],
788 (unsigned long)lock->l_resource->lr_name.name[2],
789 (unsigned long)fid_seq(&mdt_body->fid1),
790 (unsigned long)fid_oid(&mdt_body->fid1),
791 (unsigned long)fid_ver(&mdt_body->fid1));
794 memcpy(&old_lock, lockh, sizeof(*lockh));
795 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
796 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
797 ldlm_lock_decref_and_cancel(lockh,
798 it->d.lustre.it_lock_mode);
799 memcpy(lockh, &old_lock, sizeof(old_lock));
800 it->d.lustre.it_lock_handle = lockh->cookie;
803 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
804 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
805 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
810 * This long block is all about fixing up the lock and request state
811 * so that it is correct as of the moment _before_ the operation was
812 * applied; that way, the VFS will think that everything is normal and
813 * call Lustre's regular VFS methods.
815 * If we're performing a creation, that means that unless the creation
816 * failed with EEXIST, we should fake up a negative dentry.
818 * For everything else, we want to lookup to succeed.
820 * One additional note: if CREATE or OPEN succeeded, we add an extra
821 * reference to the request because we need to keep it around until
822 * ll_create/ll_open gets called.
824 * The server will return to us, in it_disposition, an indication of
825 * exactly what d.lustre.it_status refers to.
827 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
828 * otherwise if DISP_OPEN_CREATE is set, then it status is the
829 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
830 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
833 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
836 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
837 void *lmm, int lmmsize, struct lookup_intent *it,
838 int lookup_flags, struct ptlrpc_request **reqp,
839 ldlm_blocking_callback cb_blocking,
840 int extra_lock_flags)
842 struct lustre_handle lockh;
847 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
848 ", intent: %s flags %#o\n", op_data->op_namelen,
849 op_data->op_name, PFID(&op_data->op_fid2),
850 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
854 if (fid_is_sane(&op_data->op_fid2) &&
855 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
856 /* We could just return 1 immediately, but since we should only
857 * be called in revalidate_it if we already have a lock, let's
859 ldlm_policy_data_t policy;
862 /* As not all attributes are kept under update lock, e.g.
863 owner/group/acls are under lookup lock, we need both
864 ibits for GETATTR. */
866 /* For CMD, UPDATE lock and LOOKUP lock can not be got
867 * at the same for cross-object, so we can not match
868 * the 2 lock at the same time FIXME: but how to handle
869 * the above situation */
870 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
871 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
873 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
874 &op_data->op_fid2, LDLM_IBITS, &policy,
875 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
877 it->d.lustre.it_lock_handle = lockh.cookie;
878 it->d.lustre.it_lock_mode = mode;
881 /* Only return failure if it was not GETATTR by cfid
882 (from inode_revalidate) */
883 if (mode || op_data->op_namelen != 0)
887 /* lookup_it may be called only after revalidate_it has run, because
888 * revalidate_it cannot return errors, only zero. Returning zero causes
889 * this call to lookup, which *can* return an error.
891 * We only want to execute the request associated with the intent one
892 * time, however, so don't send the request again. Instead, skip past
893 * this and use the request from revalidate. In this case, revalidate
894 * never dropped its reference, so the refcounts are all OK */
895 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
896 struct ldlm_enqueue_info einfo =
897 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
898 ldlm_completion_ast, NULL, NULL, NULL };
900 /* For case if upper layer did not alloc fid, do it now. */
901 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
902 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
904 CERROR("Can't alloc new fid, rc %d\n", rc);
908 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
909 lmm, lmmsize, NULL, extra_lock_flags);
912 it->d.lustre.it_lock_handle = lockh.cookie;
913 } else if (!fid_is_sane(&op_data->op_fid2) ||
914 !(it->it_flags & O_CHECK_STALE)) {
915 /* DISP_ENQ_COMPLETE set means there is extra reference on
916 * request referenced from this intent, saved for subsequent
917 * lookup. This path is executed when we proceed to this
918 * lookup, so we clear DISP_ENQ_COMPLETE */
919 it_clear_disposition(it, DISP_ENQ_COMPLETE);
921 *reqp = it->d.lustre.it_data;
922 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
926 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
927 struct ptlrpc_request *req,
928 void *unused, int rc)
930 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
931 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
932 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
933 struct lookup_intent *it;
934 struct lustre_handle *lockh;
935 struct obd_device *obddev;
936 int flags = LDLM_FL_HAS_INTENT;
940 lockh = &minfo->mi_lockh;
942 obddev = class_exp2obd(exp);
944 mdc_exit_request(&obddev->u.cli);
945 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
948 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
949 &flags, NULL, 0, NULL, lockh, rc);
951 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
952 mdc_clear_replay_flag(req, rc);
956 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
960 it->d.lustre.it_lock_handle = lockh->cookie;
962 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
967 minfo->mi_cb(req, minfo, rc);
971 int mdc_intent_getattr_async(struct obd_export *exp,
972 struct md_enqueue_info *minfo,
973 struct ldlm_enqueue_info *einfo)
975 struct md_op_data *op_data = &minfo->mi_data;
976 struct lookup_intent *it = &minfo->mi_it;
977 struct ptlrpc_request *req;
978 struct obd_device *obddev = class_exp2obd(exp);
979 struct ldlm_res_id res_id;
980 ldlm_policy_data_t policy = {
981 .l_inodebits = { MDS_INODELOCK_LOOKUP }
984 int flags = LDLM_FL_HAS_INTENT;
987 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
988 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
989 ldlm_it2str(it->it_op), it->it_flags);
991 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
992 req = mdc_intent_getattr_pack(exp, it, op_data);
996 mdc_enter_request(&obddev->u.cli);
997 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
998 0, NULL, &minfo->mi_lockh, 1);
1000 mdc_exit_request(&obddev->u.cli);
1004 req->rq_async_args.pointer_arg[0] = exp;
1005 req->rq_async_args.pointer_arg[1] = minfo;
1006 req->rq_async_args.pointer_arg[2] = einfo;
1007 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1008 ptlrpcd_add_req(req, PSCOPE_OTHER);
1013 int mdc_revalidate_lock(struct obd_export *exp,
1014 struct lookup_intent *it,
1017 /* We could just return 1 immediately, but since we should only
1018 * be called in revalidate_it if we already have a lock, let's
1020 struct ldlm_res_id res_id;
1021 struct lustre_handle lockh;
1022 ldlm_policy_data_t policy;
1026 fid_build_reg_res_name(fid, &res_id);
1027 /* As not all attributes are kept under update lock, e.g.
1028 owner/group/acls are under lookup lock, we need both
1029 ibits for GETATTR. */
1030 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1031 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1032 MDS_INODELOCK_LOOKUP;
1034 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1035 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1036 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1038 it->d.lustre.it_lock_handle = lockh.cookie;
1039 it->d.lustre.it_lock_mode = mode;