1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
116 struct ldlm_lock *lock;
124 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
126 LASSERT(lock != NULL);
127 lock_res_and_lock(lock);
129 if (lock->l_ast_data && lock->l_ast_data != data) {
130 struct inode *new_inode = data;
131 struct inode *old_inode = lock->l_ast_data;
132 LASSERTF(old_inode->i_state & I_FREEING,
133 "Found existing inode %p/%lu/%u state %lu in lock: "
134 "setting data to %p/%lu/%u\n", old_inode,
135 old_inode->i_ino, old_inode->i_generation,
137 new_inode, new_inode->i_ino, new_inode->i_generation);
140 lock->l_ast_data = data;
141 unlock_res_and_lock(lock);
147 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
148 const struct lu_fid *fid, ldlm_type_t type,
149 ldlm_policy_data_t *policy, ldlm_mode_t mode,
150 struct lustre_handle *lockh)
152 struct ldlm_res_id res_id;
156 fid_build_reg_res_name(fid, &res_id);
157 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
158 &res_id, type, policy, mode, lockh, 0);
162 int mdc_cancel_unused(struct obd_export *exp,
163 const struct lu_fid *fid,
164 ldlm_policy_data_t *policy,
165 ldlm_mode_t mode, int flags, void *opaque)
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_change_cbdata(struct obd_export *exp,
180 const struct lu_fid *fid,
181 ldlm_iterator_t it, void *data)
183 struct ldlm_res_id res_id;
186 fid_build_reg_res_name(fid, &res_id);
187 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
194 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
196 /* Don't hold error requests for replay. */
197 if (req->rq_replay) {
198 spin_lock(&req->rq_lock);
200 spin_unlock(&req->rq_lock);
202 if (rc && req->rq_transno != 0) {
203 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
208 /* Save a large LOV EA into the request buffer so that it is available
209 * for replay. We don't do this in the initial request because the
210 * original request doesn't need this buffer (at most it sends just the
211 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212 * buffer and may also be difficult to allocate and save a very large
213 * request buffer for each open. (bug 5707)
215 * OOM here may cause recovery failure if lmm is needed (only for the
216 * original open if the MDS crashed just when this client also OOM'd)
217 * but this is incredibly unlikely, and questionable whether the client
218 * could do MDS recovery under OOM anyways... */
219 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
220 struct mdt_body *body)
224 /* FIXME: remove this explicit offset. */
225 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228 CERROR("Can't enlarge segment %d size to %d\n",
229 DLM_INTENT_REC_OFF + 4, body->eadatasize);
230 body->valid &= ~OBD_MD_FLEASIZE;
231 body->eadatasize = 0;
235 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
236 struct lookup_intent *it,
237 struct md_op_data *op_data,
238 void *lmm, int lmmsize,
241 struct ptlrpc_request *req;
242 struct obd_device *obddev = class_exp2obd(exp);
243 struct ldlm_intent *lit;
244 int joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
246 CFS_LIST_HEAD(cancels);
252 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
254 /* XXX: openlock is not cancelled for cross-refs. */
255 /* If inode is known, cancel conflicting OPEN locks. */
256 if (fid_is_sane(&op_data->op_fid2)) {
257 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
260 else if (it->it_flags & FMODE_EXEC)
265 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
270 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
271 if (it->it_op & IT_CREAT || joinfile)
275 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
277 MDS_INODELOCK_UPDATE);
279 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
280 &RQF_LDLM_INTENT_OPEN);
282 ldlm_lock_list_put(&cancels, l_bl_ast, count);
283 RETURN(ERR_PTR(-ENOMEM));
286 /* parent capability */
287 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
288 /* child capability, reserve the size according to parent capa, it will
289 * be filled after we get the reply */
290 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
292 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
293 op_data->op_namelen + 1);
294 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
295 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
297 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
301 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
303 ptlrpc_request_free(req);
308 __u64 head_size = *(__u64 *)op_data->op_data;
309 mdc_join_pack(req, op_data, head_size);
312 spin_lock(&req->rq_lock);
314 spin_unlock(&req->rq_lock);
316 /* pack the intent */
317 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
318 lit->opc = (__u64)it->it_op;
320 /* pack the intended request */
321 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
324 /* for remote client, fetch remote perm for current user */
325 if (client_is_remote(exp))
326 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
327 sizeof(struct mdt_remote_perm));
328 ptlrpc_request_set_replen(req);
332 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
333 struct lookup_intent *it,
334 struct md_op_data *op_data)
336 struct ptlrpc_request *req;
337 struct obd_device *obddev = class_exp2obd(exp);
338 struct ldlm_intent *lit;
342 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
343 &RQF_LDLM_INTENT_UNLINK);
345 RETURN(ERR_PTR(-ENOMEM));
347 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
348 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
349 op_data->op_namelen + 1);
351 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
353 ptlrpc_request_free(req);
357 /* pack the intent */
358 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
359 lit->opc = (__u64)it->it_op;
361 /* pack the intended request */
362 mdc_unlink_pack(req, op_data);
364 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
365 obddev->u.cli.cl_max_mds_easize);
366 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
367 obddev->u.cli.cl_max_mds_cookiesize);
368 ptlrpc_request_set_replen(req);
372 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
373 struct lookup_intent *it,
374 struct md_op_data *op_data)
376 struct ptlrpc_request *req;
377 struct obd_device *obddev = class_exp2obd(exp);
378 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
379 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
380 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
381 (client_is_remote(exp) ?
382 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
383 struct ldlm_intent *lit;
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_GETATTR);
390 RETURN(ERR_PTR(-ENOMEM));
392 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
393 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
394 op_data->op_namelen + 1);
396 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
398 ptlrpc_request_free(req);
402 /* pack the intent */
403 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
404 lit->opc = (__u64)it->it_op;
406 /* pack the intended request */
407 mdc_getattr_pack(req, valid, it->it_flags, op_data);
409 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
410 obddev->u.cli.cl_max_mds_easize);
411 if (client_is_remote(exp))
412 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
413 sizeof(struct mdt_remote_perm));
414 ptlrpc_request_set_replen(req);
418 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
420 struct ptlrpc_request *req;
424 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
426 RETURN(ERR_PTR(-ENOMEM));
428 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430 ptlrpc_request_free(req);
434 ptlrpc_request_set_replen(req);
438 static int mdc_finish_enqueue(struct obd_export *exp,
439 struct ptlrpc_request *req,
440 struct ldlm_enqueue_info *einfo,
441 struct lookup_intent *it,
442 struct lustre_handle *lockh,
445 struct req_capsule *pill = &req->rq_pill;
446 struct ldlm_request *lockreq;
447 struct ldlm_reply *lockrep;
451 /* Similarly, if we're going to replay this request, we don't want to
452 * actually get a lock, just perform the intent. */
453 if (req->rq_transno || req->rq_replay) {
454 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
455 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
458 if (rc == ELDLM_LOCK_ABORTED) {
460 memset(lockh, 0, sizeof(*lockh));
462 } else { /* rc = 0 */
463 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
466 /* If the server gave us back a different lock mode, we should
467 * fix up our variables. */
468 if (lock->l_req_mode != einfo->ei_mode) {
469 ldlm_lock_addref(lockh, lock->l_req_mode);
470 ldlm_lock_decref(lockh, einfo->ei_mode);
471 einfo->ei_mode = lock->l_req_mode;
476 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
477 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
479 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
480 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
481 it->d.lustre.it_lock_mode = einfo->ei_mode;
482 it->d.lustre.it_lock_handle = lockh->cookie;
483 it->d.lustre.it_data = req;
485 if (it->d.lustre.it_status < 0 && req->rq_replay)
486 mdc_clear_replay_flag(req, it->d.lustre.it_status);
488 /* If we're doing an IT_OPEN which did not result in an actual
489 * successful open, then we need to remove the bit which saves
490 * this request for unconditional replay.
492 * It's important that we do this first! Otherwise we might exit the
493 * function without doing so, and try to replay a failed create
495 if (it->it_op & IT_OPEN && req->rq_replay &&
496 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
497 mdc_clear_replay_flag(req, it->d.lustre.it_status);
499 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
500 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
502 /* We know what to expect, so we do any byte flipping required here */
503 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
504 struct mdt_body *body;
506 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
508 CERROR ("Can't swab mdt_body\n");
512 if (it_disposition(it, DISP_OPEN_OPEN) &&
513 !it_open_error(DISP_OPEN_OPEN, it)) {
515 * If this is a successful OPEN request, we need to set
516 * replay handler and data early, so that if replay
517 * happens immediately after swabbing below, new reply
518 * is swabbed by that handler correctly.
520 mdc_set_open_replay_data(NULL, NULL, req);
523 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
527 * The eadata is opaque; just check that it is there.
528 * Eventually, obd_unpackmd() will check the contents.
530 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
536 * We save the reply LOV EA in case we have to replay a
537 * create for recovery. If we didn't allocate a large
538 * enough request buffer above we need to reallocate it
539 * here to hold the actual LOV EA.
541 * To not save LOV EA if request is not going to replay
542 * (for example error one).
544 if ((it->it_op & IT_OPEN) && req->rq_replay) {
546 if (req_capsule_get_size(pill, &RMF_EADATA,
549 mdc_realloc_openmsg(req, body);
550 req_capsule_set_size(pill, &RMF_EADATA,
554 lmm = req_capsule_client_get(pill, &RMF_EADATA);
556 memcpy(lmm, eadata, body->eadatasize);
560 if (body->valid & OBD_MD_FLRMTPERM) {
561 struct mdt_remote_perm *perm;
563 LASSERT(client_is_remote(exp));
564 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
565 lustre_swab_mdt_remote_perm);
569 if (body->valid & OBD_MD_FLMDSCAPA) {
570 struct lustre_capa *capa, *p;
572 capa = req_capsule_server_get(pill, &RMF_CAPA1);
576 if (it->it_op & IT_OPEN) {
577 /* client fid capa will be checked in replay */
578 p = req_capsule_client_get(pill, &RMF_CAPA2);
583 if (body->valid & OBD_MD_FLOSSCAPA) {
584 struct lustre_capa *capa;
586 capa = req_capsule_server_get(pill, &RMF_CAPA2);
595 /* We always reserve enough space in the reply packet for a stripe MD, because
596 * we don't know in advance the file type. */
597 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
598 struct lookup_intent *it, struct md_op_data *op_data,
599 struct lustre_handle *lockh, void *lmm, int lmmsize,
600 struct ptlrpc_request **reqp, int extra_lock_flags)
602 struct obd_device *obddev = class_exp2obd(exp);
603 struct ptlrpc_request *req = NULL;
604 struct req_capsule *pill;
605 int flags = extra_lock_flags;
607 struct ldlm_res_id res_id;
608 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
611 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
614 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
617 flags |= LDLM_FL_HAS_INTENT;
618 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
619 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
625 /* The only way right now is FLOCK, in this case we hide flock
626 policy as lmm, but lmmsize is 0 */
627 LASSERT(lmm && lmmsize == 0);
628 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
630 policy = *(ldlm_policy_data_t *)lmm;
631 res_id.name[3] = LDLM_FLOCK;
632 } else if (it->it_op & IT_OPEN) {
633 int joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
636 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
639 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
640 einfo->ei_cbdata = NULL;
643 it->it_create_mode &= ~M_JOIN_FILE;
644 } else if (it->it_op & IT_UNLINK)
645 req = mdc_intent_unlink_pack(exp, it, op_data);
646 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
647 req = mdc_intent_getattr_pack(exp, it, op_data);
648 else if (it->it_op == IT_READDIR)
649 req = ldlm_enqueue_pack(exp);
656 RETURN(PTR_ERR(req));
657 pill = &req->rq_pill;
659 /* It is important to obtain rpc_lock first (if applicable), so that
660 * threads that are serialised with rpc_lock are not polluting our
661 * rpcs in flight counter. We do not do flock request limiting, though*/
663 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
664 mdc_enter_request(&obddev->u.cli);
666 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
672 mdc_exit_request(&obddev->u.cli);
673 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
676 /* For flock requests we immediatelly return without further
677 delay and let caller deal with the rest, since rest of
678 this function metadata processing makes no sense for flock
684 CERROR("ldlm_cli_enqueue: %d\n", rc);
685 mdc_clear_replay_flag(req, rc);
686 ptlrpc_req_finished(req);
689 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
694 static int mdc_finish_intent_lock(struct obd_export *exp,
695 struct ptlrpc_request *request,
696 struct md_op_data *op_data,
697 struct lookup_intent *it,
698 struct lustre_handle *lockh)
700 struct lustre_handle old_lock;
701 struct mdt_body *mdt_body;
702 struct ldlm_lock *lock;
706 LASSERT(request != NULL);
707 LASSERT(request != LP_POISON);
708 LASSERT(request->rq_repmsg != LP_POISON);
710 if (!it_disposition(it, DISP_IT_EXECD)) {
711 /* The server failed before it even started executing the
712 * intent, i.e. because it couldn't unpack the request. */
713 LASSERT(it->d.lustre.it_status != 0);
714 RETURN(it->d.lustre.it_status);
716 rc = it_open_error(DISP_IT_EXECD, it);
720 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
721 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
723 /* If we were revalidating a fid/name pair, mark the intent in
724 * case we fail and get called again from lookup */
725 if (fid_is_sane(&op_data->op_fid2) &&
726 it->it_create_mode & M_CHECK_STALE &&
727 it->it_op != IT_GETATTR) {
728 it_set_disposition(it, DISP_ENQ_COMPLETE);
730 /* Also: did we find the same inode? */
731 /* sever can return one of two fids:
732 * op_fid2 - new allocated fid - if file is created.
733 * op_fid3 - existent fid - if file only open.
734 * op_fid3 is saved in lmv_intent_open */
735 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
736 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
737 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
738 "\n", PFID(&op_data->op_fid2),
739 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
744 rc = it_open_error(DISP_LOOKUP_EXECD, it);
748 /* keep requests around for the multiple phases of the call
749 * this shows the DISP_XX must guarantee we make it into the call
751 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
752 it_disposition(it, DISP_OPEN_CREATE) &&
753 !it_open_error(DISP_OPEN_CREATE, it)) {
754 it_set_disposition(it, DISP_ENQ_CREATE_REF);
755 ptlrpc_request_addref(request); /* balanced in ll_create_node */
757 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
758 it_disposition(it, DISP_OPEN_OPEN) &&
759 !it_open_error(DISP_OPEN_OPEN, it)) {
760 it_set_disposition(it, DISP_ENQ_OPEN_REF);
761 ptlrpc_request_addref(request); /* balanced in ll_file_open */
762 /* BUG 11546 - eviction in the middle of open rpc processing */
763 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
766 if (it->it_op & IT_CREAT) {
767 /* XXX this belongs in ll_create_it */
768 } else if (it->it_op == IT_OPEN) {
769 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
771 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
774 /* If we already have a matching lock, then cancel the new
775 * one. We have to set the data here instead of in
776 * mdc_enqueue, because we need to use the child's inode as
777 * the l_ast_data to match, and that's not available until
778 * intent_finish has performed the iget().) */
779 lock = ldlm_handle2lock(lockh);
781 ldlm_policy_data_t policy = lock->l_policy_data;
782 LDLM_DEBUG(lock, "matching against this");
784 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
785 &lock->l_resource->lr_name),
786 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
787 (unsigned long)lock->l_resource->lr_name.name[0],
788 (unsigned long)lock->l_resource->lr_name.name[1],
789 (unsigned long)lock->l_resource->lr_name.name[2],
790 (unsigned long)fid_seq(&mdt_body->fid1),
791 (unsigned long)fid_oid(&mdt_body->fid1),
792 (unsigned long)fid_ver(&mdt_body->fid1));
795 memcpy(&old_lock, lockh, sizeof(*lockh));
796 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
797 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
798 ldlm_lock_decref_and_cancel(lockh,
799 it->d.lustre.it_lock_mode);
800 memcpy(lockh, &old_lock, sizeof(old_lock));
801 it->d.lustre.it_lock_handle = lockh->cookie;
804 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
805 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
806 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
811 * This long block is all about fixing up the lock and request state
812 * so that it is correct as of the moment _before_ the operation was
813 * applied; that way, the VFS will think that everything is normal and
814 * call Lustre's regular VFS methods.
816 * If we're performing a creation, that means that unless the creation
817 * failed with EEXIST, we should fake up a negative dentry.
819 * For everything else, we want to lookup to succeed.
821 * One additional note: if CREATE or OPEN succeeded, we add an extra
822 * reference to the request because we need to keep it around until
823 * ll_create/ll_open gets called.
825 * The server will return to us, in it_disposition, an indication of
826 * exactly what d.lustre.it_status refers to.
828 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
829 * otherwise if DISP_OPEN_CREATE is set, then it status is the
830 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
831 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
834 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
837 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
838 void *lmm, int lmmsize, struct lookup_intent *it,
839 int lookup_flags, struct ptlrpc_request **reqp,
840 ldlm_blocking_callback cb_blocking,
841 int extra_lock_flags)
843 struct lustre_handle lockh;
848 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
849 ", intent: %s flags %#o\n", op_data->op_namelen,
850 op_data->op_name, PFID(&op_data->op_fid2),
851 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
855 if (fid_is_sane(&op_data->op_fid2) &&
856 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
857 /* We could just return 1 immediately, but since we should only
858 * be called in revalidate_it if we already have a lock, let's
860 ldlm_policy_data_t policy;
863 /* As not all attributes are kept under update lock, e.g.
864 owner/group/acls are under lookup lock, we need both
865 ibits for GETATTR. */
867 /* For CMD, UPDATE lock and LOOKUP lock can not be got
868 * at the same for cross-object, so we can not match
869 * the 2 lock at the same time FIXME: but how to handle
870 * the above situation */
871 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
872 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
874 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
875 &op_data->op_fid2, LDLM_IBITS, &policy,
876 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
878 it->d.lustre.it_lock_handle = lockh.cookie;
879 it->d.lustre.it_lock_mode = mode;
882 /* Only return failure if it was not GETATTR by cfid
883 (from inode_revalidate) */
884 if (mode || op_data->op_namelen != 0)
888 /* lookup_it may be called only after revalidate_it has run, because
889 * revalidate_it cannot return errors, only zero. Returning zero causes
890 * this call to lookup, which *can* return an error.
892 * We only want to execute the request associated with the intent one
893 * time, however, so don't send the request again. Instead, skip past
894 * this and use the request from revalidate. In this case, revalidate
895 * never dropped its reference, so the refcounts are all OK */
896 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
897 struct ldlm_enqueue_info einfo =
898 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
899 ldlm_completion_ast, NULL, NULL, NULL };
901 /* For case if upper layer did not alloc fid, do it now. */
902 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
903 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
905 CERROR("Can't alloc new fid, rc %d\n", rc);
909 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
910 lmm, lmmsize, NULL, extra_lock_flags);
913 } else if (!fid_is_sane(&op_data->op_fid2) ||
914 !(it->it_create_mode & M_CHECK_STALE)) {
915 /* DISP_ENQ_COMPLETE set means there is extra reference on
916 * request referenced from this intent, saved for subsequent
917 * lookup. This path is executed when we proceed to this
918 * lookup, so we clear DISP_ENQ_COMPLETE */
919 it_clear_disposition(it, DISP_ENQ_COMPLETE);
921 *reqp = it->d.lustre.it_data;
922 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
926 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
927 struct ptlrpc_request *req,
928 void *unused, int rc)
930 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
931 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
932 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
933 struct lookup_intent *it;
934 struct lustre_handle *lockh;
935 struct obd_device *obddev;
936 int flags = LDLM_FL_HAS_INTENT;
940 lockh = &minfo->mi_lockh;
942 obddev = class_exp2obd(exp);
944 mdc_exit_request(&obddev->u.cli);
945 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
948 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
949 &flags, NULL, 0, NULL, lockh, rc);
951 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
952 mdc_clear_replay_flag(req, rc);
956 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
960 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
965 minfo->mi_cb(req, minfo, rc);
969 int mdc_intent_getattr_async(struct obd_export *exp,
970 struct md_enqueue_info *minfo,
971 struct ldlm_enqueue_info *einfo)
973 struct md_op_data *op_data = &minfo->mi_data;
974 struct lookup_intent *it = &minfo->mi_it;
975 struct ptlrpc_request *req;
976 struct obd_device *obddev = class_exp2obd(exp);
977 struct ldlm_res_id res_id;
978 ldlm_policy_data_t policy = {
979 .l_inodebits = { MDS_INODELOCK_LOOKUP }
982 int flags = LDLM_FL_HAS_INTENT;
985 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
986 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
987 ldlm_it2str(it->it_op), it->it_flags);
989 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
990 req = mdc_intent_getattr_pack(exp, it, op_data);
994 mdc_enter_request(&obddev->u.cli);
995 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
996 0, NULL, &minfo->mi_lockh, 1);
998 mdc_exit_request(&obddev->u.cli);
1002 req->rq_async_args.pointer_arg[0] = exp;
1003 req->rq_async_args.pointer_arg[1] = minfo;
1004 req->rq_async_args.pointer_arg[2] = einfo;
1005 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1006 ptlrpcd_add_req(req, PSCOPE_OTHER);
1011 int mdc_revalidate_lock(struct obd_export *exp,
1012 struct lookup_intent *it,
1015 /* We could just return 1 immediately, but since we should only
1016 * be called in revalidate_it if we already have a lock, let's
1018 struct ldlm_res_id res_id;
1019 struct lustre_handle lockh;
1020 ldlm_policy_data_t policy;
1024 fid_build_reg_res_name(fid, &res_id);
1025 /* As not all attributes are kept under update lock, e.g.
1026 owner/group/acls are under lookup lock, we need both
1027 ibits for GETATTR. */
1028 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1029 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1030 MDS_INODELOCK_LOOKUP;
1032 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1033 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1034 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1036 it->d.lustre.it_lock_handle = lockh.cookie;
1037 it->d.lustre.it_lock_mode = mode;