1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
117 struct ldlm_lock *lock;
128 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130 LASSERT(lock != NULL);
131 lock_res_and_lock(lock);
133 if (lock->l_ast_data && lock->l_ast_data != data) {
134 struct inode *new_inode = data;
135 struct inode *old_inode = lock->l_ast_data;
136 LASSERTF(old_inode->i_state & I_FREEING,
137 "Found existing inode %p/%lu/%u state %lu in lock: "
138 "setting data to %p/%lu/%u\n", old_inode,
139 old_inode->i_ino, old_inode->i_generation,
141 new_inode, new_inode->i_ino, new_inode->i_generation);
144 lock->l_ast_data = data;
146 *bits = lock->l_policy_data.l_inodebits.bits;
148 unlock_res_and_lock(lock);
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
159 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165 &res_id, type, policy, mode, lockh, 0);
169 int mdc_cancel_unused(struct obd_export *exp,
170 const struct lu_fid *fid,
171 ldlm_policy_data_t *policy,
173 ldlm_cancel_flags_t flags,
176 struct ldlm_res_id res_id;
177 struct obd_device *obd = class_exp2obd(exp);
182 fid_build_reg_res_name(fid, &res_id);
183 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
184 policy, mode, flags, opaque);
188 int mdc_change_cbdata(struct obd_export *exp,
189 const struct lu_fid *fid,
190 ldlm_iterator_t it, void *data)
192 struct ldlm_res_id res_id;
195 fid_build_reg_res_name(fid, &res_id);
196 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
203 /* find any ldlm lock of the inode in mdc
207 int mdc_find_cbdata(struct obd_export *exp,
208 const struct lu_fid *fid,
209 ldlm_iterator_t it, void *data)
211 struct ldlm_res_id res_id;
215 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218 if (rc == LDLM_ITER_STOP)
220 else if (rc == LDLM_ITER_CONTINUE)
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 /* Don't hold error requests for replay. */
228 if (req->rq_replay) {
229 cfs_spin_lock(&req->rq_lock);
231 cfs_spin_unlock(&req->rq_lock);
233 if (rc && req->rq_transno != 0) {
234 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239 /* Save a large LOV EA into the request buffer so that it is available
240 * for replay. We don't do this in the initial request because the
241 * original request doesn't need this buffer (at most it sends just the
242 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243 * buffer and may also be difficult to allocate and save a very large
244 * request buffer for each open. (bug 5707)
246 * OOM here may cause recovery failure if lmm is needed (only for the
247 * original open if the MDS crashed just when this client also OOM'd)
248 * but this is incredibly unlikely, and questionable whether the client
249 * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251 struct mdt_body *body)
255 /* FIXME: remove this explicit offset. */
256 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
259 CERROR("Can't enlarge segment %d size to %d\n",
260 DLM_INTENT_REC_OFF + 4, body->eadatasize);
261 body->valid &= ~OBD_MD_FLEASIZE;
262 body->eadatasize = 0;
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267 struct lookup_intent *it,
268 struct md_op_data *op_data,
269 void *lmm, int lmmsize,
272 struct ptlrpc_request *req;
273 struct obd_device *obddev = class_exp2obd(exp);
274 struct ldlm_intent *lit;
275 CFS_LIST_HEAD(cancels);
281 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283 /* XXX: openlock is not cancelled for cross-refs. */
284 /* If inode is known, cancel conflicting OPEN locks. */
285 if (fid_is_sane(&op_data->op_fid2)) {
286 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
289 else if (it->it_flags & FMODE_EXEC)
294 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
299 /* If CREATE, cancel parent's UPDATE lock. */
300 if (it->it_op & IT_CREAT)
304 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306 MDS_INODELOCK_UPDATE);
308 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309 &RQF_LDLM_INTENT_OPEN);
311 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312 RETURN(ERR_PTR(-ENOMEM));
315 /* parent capability */
316 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317 /* child capability, reserve the size according to parent capa, it will
318 * be filled after we get the reply */
319 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322 op_data->op_namelen + 1);
323 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328 ptlrpc_request_free(req);
332 cfs_spin_lock(&req->rq_lock);
333 req->rq_replay = req->rq_import->imp_replayable;
334 cfs_spin_unlock(&req->rq_lock);
336 /* pack the intent */
337 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338 lit->opc = (__u64)it->it_op;
340 /* pack the intended request */
341 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
344 /* for remote client, fetch remote perm for current user */
345 if (client_is_remote(exp))
346 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
347 sizeof(struct mdt_remote_perm));
348 ptlrpc_request_set_replen(req);
352 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
353 struct lookup_intent *it,
354 struct md_op_data *op_data)
356 struct ptlrpc_request *req;
357 struct obd_device *obddev = class_exp2obd(exp);
358 struct ldlm_intent *lit;
362 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363 &RQF_LDLM_INTENT_UNLINK);
365 RETURN(ERR_PTR(-ENOMEM));
367 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
368 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
369 op_data->op_namelen + 1);
371 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
373 ptlrpc_request_free(req);
377 /* pack the intent */
378 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379 lit->opc = (__u64)it->it_op;
381 /* pack the intended request */
382 mdc_unlink_pack(req, op_data);
384 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
385 obddev->u.cli.cl_max_mds_easize);
386 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
387 obddev->u.cli.cl_max_mds_cookiesize);
388 ptlrpc_request_set_replen(req);
392 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
393 struct lookup_intent *it,
394 struct md_op_data *op_data)
396 struct ptlrpc_request *req;
397 struct obd_device *obddev = class_exp2obd(exp);
398 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
399 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
400 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
401 (client_is_remote(exp) ?
402 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
403 struct ldlm_intent *lit;
407 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408 &RQF_LDLM_INTENT_GETATTR);
410 RETURN(ERR_PTR(-ENOMEM));
412 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
413 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414 op_data->op_namelen + 1);
416 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418 ptlrpc_request_free(req);
422 /* pack the intent */
423 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424 lit->opc = (__u64)it->it_op;
426 /* pack the intended request */
427 mdc_getattr_pack(req, valid, it->it_flags, op_data,
428 obddev->u.cli.cl_max_mds_easize);
430 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
431 obddev->u.cli.cl_max_mds_easize);
432 if (client_is_remote(exp))
433 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
434 sizeof(struct mdt_remote_perm));
435 ptlrpc_request_set_replen(req);
439 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
441 struct ptlrpc_request *req;
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
447 RETURN(ERR_PTR(-ENOMEM));
449 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
451 ptlrpc_request_free(req);
455 ptlrpc_request_set_replen(req);
459 static int mdc_finish_enqueue(struct obd_export *exp,
460 struct ptlrpc_request *req,
461 struct ldlm_enqueue_info *einfo,
462 struct lookup_intent *it,
463 struct lustre_handle *lockh,
466 struct req_capsule *pill = &req->rq_pill;
467 struct ldlm_request *lockreq;
468 struct ldlm_reply *lockrep;
472 /* Similarly, if we're going to replay this request, we don't want to
473 * actually get a lock, just perform the intent. */
474 if (req->rq_transno || req->rq_replay) {
475 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
476 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
479 if (rc == ELDLM_LOCK_ABORTED) {
481 memset(lockh, 0, sizeof(*lockh));
483 } else { /* rc = 0 */
484 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
487 /* If the server gave us back a different lock mode, we should
488 * fix up our variables. */
489 if (lock->l_req_mode != einfo->ei_mode) {
490 ldlm_lock_addref(lockh, lock->l_req_mode);
491 ldlm_lock_decref(lockh, einfo->ei_mode);
492 einfo->ei_mode = lock->l_req_mode;
497 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
498 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
500 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
501 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
502 it->d.lustre.it_lock_mode = einfo->ei_mode;
503 it->d.lustre.it_lock_handle = lockh->cookie;
504 it->d.lustre.it_data = req;
506 if (it->d.lustre.it_status < 0 && req->rq_replay)
507 mdc_clear_replay_flag(req, it->d.lustre.it_status);
509 /* If we're doing an IT_OPEN which did not result in an actual
510 * successful open, then we need to remove the bit which saves
511 * this request for unconditional replay.
513 * It's important that we do this first! Otherwise we might exit the
514 * function without doing so, and try to replay a failed create
516 if (it->it_op & IT_OPEN && req->rq_replay &&
517 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
518 mdc_clear_replay_flag(req, it->d.lustre.it_status);
520 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
521 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
523 /* We know what to expect, so we do any byte flipping required here */
524 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
525 struct mdt_body *body;
527 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
529 CERROR ("Can't swab mdt_body\n");
533 if (it_disposition(it, DISP_OPEN_OPEN) &&
534 !it_open_error(DISP_OPEN_OPEN, it)) {
536 * If this is a successful OPEN request, we need to set
537 * replay handler and data early, so that if replay
538 * happens immediately after swabbing below, new reply
539 * is swabbed by that handler correctly.
541 mdc_set_open_replay_data(NULL, NULL, req);
544 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
547 mdc_update_max_ea_from_body(exp, body);
550 * The eadata is opaque; just check that it is there.
551 * Eventually, obd_unpackmd() will check the contents.
553 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
559 * We save the reply LOV EA in case we have to replay a
560 * create for recovery. If we didn't allocate a large
561 * enough request buffer above we need to reallocate it
562 * here to hold the actual LOV EA.
564 * To not save LOV EA if request is not going to replay
565 * (for example error one).
567 if ((it->it_op & IT_OPEN) && req->rq_replay) {
569 if (req_capsule_get_size(pill, &RMF_EADATA,
572 mdc_realloc_openmsg(req, body);
574 req_capsule_shrink(pill, &RMF_EADATA,
578 req_capsule_set_size(pill, &RMF_EADATA,
582 lmm = req_capsule_client_get(pill, &RMF_EADATA);
584 memcpy(lmm, eadata, body->eadatasize);
588 if (body->valid & OBD_MD_FLRMTPERM) {
589 struct mdt_remote_perm *perm;
591 LASSERT(client_is_remote(exp));
592 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
593 lustre_swab_mdt_remote_perm);
597 if (body->valid & OBD_MD_FLMDSCAPA) {
598 struct lustre_capa *capa, *p;
600 capa = req_capsule_server_get(pill, &RMF_CAPA1);
604 if (it->it_op & IT_OPEN) {
605 /* client fid capa will be checked in replay */
606 p = req_capsule_client_get(pill, &RMF_CAPA2);
611 if (body->valid & OBD_MD_FLOSSCAPA) {
612 struct lustre_capa *capa;
614 capa = req_capsule_server_get(pill, &RMF_CAPA2);
623 /* We always reserve enough space in the reply packet for a stripe MD, because
624 * we don't know in advance the file type. */
625 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
626 struct lookup_intent *it, struct md_op_data *op_data,
627 struct lustre_handle *lockh, void *lmm, int lmmsize,
628 struct ptlrpc_request **reqp, int extra_lock_flags)
630 struct obd_device *obddev = class_exp2obd(exp);
631 struct ptlrpc_request *req = NULL;
632 struct req_capsule *pill;
633 int flags = extra_lock_flags;
635 struct ldlm_res_id res_id;
636 static const ldlm_policy_data_t lookup_policy =
637 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
638 static const ldlm_policy_data_t update_policy =
639 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
640 ldlm_policy_data_t const *policy = &lookup_policy;
643 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
646 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
649 flags |= LDLM_FL_HAS_INTENT;
650 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
651 policy = &update_policy;
657 /* The only way right now is FLOCK, in this case we hide flock
658 policy as lmm, but lmmsize is 0 */
659 LASSERT(lmm && lmmsize == 0);
660 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
662 policy = (ldlm_policy_data_t *)lmm;
663 res_id.name[3] = LDLM_FLOCK;
664 } else if (it->it_op & IT_OPEN) {
665 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
667 policy = &update_policy;
668 einfo->ei_cbdata = NULL;
670 } else if (it->it_op & IT_UNLINK)
671 req = mdc_intent_unlink_pack(exp, it, op_data);
672 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
673 req = mdc_intent_getattr_pack(exp, it, op_data);
674 else if (it->it_op == IT_READDIR)
675 req = ldlm_enqueue_pack(exp);
682 RETURN(PTR_ERR(req));
683 pill = &req->rq_pill;
685 /* It is important to obtain rpc_lock first (if applicable), so that
686 * threads that are serialised with rpc_lock are not polluting our
687 * rpcs in flight counter. We do not do flock request limiting, though*/
689 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
690 mdc_enter_request(&obddev->u.cli);
693 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
699 mdc_exit_request(&obddev->u.cli);
700 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
703 /* For flock requests we immediatelly return without further
704 delay and let caller deal with the rest, since rest of
705 this function metadata processing makes no sense for flock
711 CERROR("ldlm_cli_enqueue: %d\n", rc);
712 mdc_clear_replay_flag(req, rc);
713 ptlrpc_req_finished(req);
716 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
721 static int mdc_finish_intent_lock(struct obd_export *exp,
722 struct ptlrpc_request *request,
723 struct md_op_data *op_data,
724 struct lookup_intent *it,
725 struct lustre_handle *lockh)
727 struct lustre_handle old_lock;
728 struct mdt_body *mdt_body;
729 struct ldlm_lock *lock;
733 LASSERT(request != NULL);
734 LASSERT(request != LP_POISON);
735 LASSERT(request->rq_repmsg != LP_POISON);
737 if (!it_disposition(it, DISP_IT_EXECD)) {
738 /* The server failed before it even started executing the
739 * intent, i.e. because it couldn't unpack the request. */
740 LASSERT(it->d.lustre.it_status != 0);
741 RETURN(it->d.lustre.it_status);
743 rc = it_open_error(DISP_IT_EXECD, it);
747 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
748 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
750 /* If we were revalidating a fid/name pair, mark the intent in
751 * case we fail and get called again from lookup */
752 if (fid_is_sane(&op_data->op_fid2) &&
753 it->it_create_mode & M_CHECK_STALE &&
754 it->it_op != IT_GETATTR) {
755 it_set_disposition(it, DISP_ENQ_COMPLETE);
757 /* Also: did we find the same inode? */
758 /* sever can return one of two fids:
759 * op_fid2 - new allocated fid - if file is created.
760 * op_fid3 - existent fid - if file only open.
761 * op_fid3 is saved in lmv_intent_open */
762 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
763 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
764 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
765 "\n", PFID(&op_data->op_fid2),
766 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
771 rc = it_open_error(DISP_LOOKUP_EXECD, it);
775 /* keep requests around for the multiple phases of the call
776 * this shows the DISP_XX must guarantee we make it into the call
778 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
779 it_disposition(it, DISP_OPEN_CREATE) &&
780 !it_open_error(DISP_OPEN_CREATE, it)) {
781 it_set_disposition(it, DISP_ENQ_CREATE_REF);
782 ptlrpc_request_addref(request); /* balanced in ll_create_node */
784 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
785 it_disposition(it, DISP_OPEN_OPEN) &&
786 !it_open_error(DISP_OPEN_OPEN, it)) {
787 it_set_disposition(it, DISP_ENQ_OPEN_REF);
788 ptlrpc_request_addref(request); /* balanced in ll_file_open */
789 /* BUG 11546 - eviction in the middle of open rpc processing */
790 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
793 if (it->it_op & IT_CREAT) {
794 /* XXX this belongs in ll_create_it */
795 } else if (it->it_op == IT_OPEN) {
796 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
798 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
801 /* If we already have a matching lock, then cancel the new
802 * one. We have to set the data here instead of in
803 * mdc_enqueue, because we need to use the child's inode as
804 * the l_ast_data to match, and that's not available until
805 * intent_finish has performed the iget().) */
806 lock = ldlm_handle2lock(lockh);
808 ldlm_policy_data_t policy = lock->l_policy_data;
809 LDLM_DEBUG(lock, "matching against this");
811 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
812 &lock->l_resource->lr_name),
813 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
814 (unsigned long)lock->l_resource->lr_name.name[0],
815 (unsigned long)lock->l_resource->lr_name.name[1],
816 (unsigned long)lock->l_resource->lr_name.name[2],
817 (unsigned long)fid_seq(&mdt_body->fid1),
818 (unsigned long)fid_oid(&mdt_body->fid1),
819 (unsigned long)fid_ver(&mdt_body->fid1));
822 memcpy(&old_lock, lockh, sizeof(*lockh));
823 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
824 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
825 ldlm_lock_decref_and_cancel(lockh,
826 it->d.lustre.it_lock_mode);
827 memcpy(lockh, &old_lock, sizeof(old_lock));
828 it->d.lustre.it_lock_handle = lockh->cookie;
831 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
832 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
833 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
837 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
840 /* We could just return 1 immediately, but since we should only
841 * be called in revalidate_it if we already have a lock, let's
843 struct ldlm_res_id res_id;
844 struct lustre_handle lockh;
845 ldlm_policy_data_t policy;
849 fid_build_reg_res_name(fid, &res_id);
850 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
851 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
853 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
854 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
855 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
857 it->d.lustre.it_lock_handle = lockh.cookie;
858 it->d.lustre.it_lock_mode = mode;
865 * This long block is all about fixing up the lock and request state
866 * so that it is correct as of the moment _before_ the operation was
867 * applied; that way, the VFS will think that everything is normal and
868 * call Lustre's regular VFS methods.
870 * If we're performing a creation, that means that unless the creation
871 * failed with EEXIST, we should fake up a negative dentry.
873 * For everything else, we want to lookup to succeed.
875 * One additional note: if CREATE or OPEN succeeded, we add an extra
876 * reference to the request because we need to keep it around until
877 * ll_create/ll_open gets called.
879 * The server will return to us, in it_disposition, an indication of
880 * exactly what d.lustre.it_status refers to.
882 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
883 * otherwise if DISP_OPEN_CREATE is set, then it status is the
884 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
885 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
888 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
891 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
892 void *lmm, int lmmsize, struct lookup_intent *it,
893 int lookup_flags, struct ptlrpc_request **reqp,
894 ldlm_blocking_callback cb_blocking,
895 int extra_lock_flags)
897 struct lustre_handle lockh;
902 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
903 ", intent: %s flags %#o\n", op_data->op_namelen,
904 op_data->op_name, PFID(&op_data->op_fid2),
905 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
909 if (fid_is_sane(&op_data->op_fid2) &&
910 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
911 /* We could just return 1 immediately, but since we should only
912 * be called in revalidate_it if we already have a lock, let's
914 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2);
915 /* Only return failure if it was not GETATTR by cfid
916 (from inode_revalidate) */
917 if (rc || op_data->op_namelen != 0)
921 /* lookup_it may be called only after revalidate_it has run, because
922 * revalidate_it cannot return errors, only zero. Returning zero causes
923 * this call to lookup, which *can* return an error.
925 * We only want to execute the request associated with the intent one
926 * time, however, so don't send the request again. Instead, skip past
927 * this and use the request from revalidate. In this case, revalidate
928 * never dropped its reference, so the refcounts are all OK */
929 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
930 struct ldlm_enqueue_info einfo =
931 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
932 ldlm_completion_ast, NULL, NULL, NULL };
934 /* For case if upper layer did not alloc fid, do it now. */
935 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
936 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
938 CERROR("Can't alloc new fid, rc %d\n", rc);
942 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
943 lmm, lmmsize, NULL, extra_lock_flags);
946 } else if (!fid_is_sane(&op_data->op_fid2) ||
947 !(it->it_create_mode & M_CHECK_STALE)) {
948 /* DISP_ENQ_COMPLETE set means there is extra reference on
949 * request referenced from this intent, saved for subsequent
950 * lookup. This path is executed when we proceed to this
951 * lookup, so we clear DISP_ENQ_COMPLETE */
952 it_clear_disposition(it, DISP_ENQ_COMPLETE);
954 *reqp = it->d.lustre.it_data;
955 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
959 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
960 struct ptlrpc_request *req,
961 void *unused, int rc)
963 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
964 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
965 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
966 struct lookup_intent *it;
967 struct lustre_handle *lockh;
968 struct obd_device *obddev;
969 int flags = LDLM_FL_HAS_INTENT;
973 lockh = &minfo->mi_lockh;
975 obddev = class_exp2obd(exp);
977 mdc_exit_request(&obddev->u.cli);
978 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
981 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
982 &flags, NULL, 0, lockh, rc);
984 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
985 mdc_clear_replay_flag(req, rc);
989 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
993 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
998 minfo->mi_cb(req, minfo, rc);
1002 int mdc_intent_getattr_async(struct obd_export *exp,
1003 struct md_enqueue_info *minfo,
1004 struct ldlm_enqueue_info *einfo)
1006 struct md_op_data *op_data = &minfo->mi_data;
1007 struct lookup_intent *it = &minfo->mi_it;
1008 struct ptlrpc_request *req;
1009 struct obd_device *obddev = class_exp2obd(exp);
1010 struct ldlm_res_id res_id;
1011 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1012 * for statahead currently. Consider CMD in future, such two bits
1013 * maybe managed by different MDS, should be adjusted then. */
1014 ldlm_policy_data_t policy = {
1015 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1016 MDS_INODELOCK_UPDATE }
1019 int flags = LDLM_FL_HAS_INTENT;
1022 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1023 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1024 ldlm_it2str(it->it_op), it->it_flags);
1026 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1027 req = mdc_intent_getattr_pack(exp, it, op_data);
1031 mdc_enter_request(&obddev->u.cli);
1032 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1033 0, &minfo->mi_lockh, 1);
1035 mdc_exit_request(&obddev->u.cli);
1039 req->rq_async_args.pointer_arg[0] = exp;
1040 req->rq_async_args.pointer_arg[1] = minfo;
1041 req->rq_async_args.pointer_arg[2] = einfo;
1042 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1043 ptlrpcd_add_req(req, PSCOPE_OTHER);