1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
117 struct ldlm_lock *lock;
128 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130 LASSERT(lock != NULL);
131 lock_res_and_lock(lock);
133 if (lock->l_ast_data && lock->l_ast_data != data) {
134 struct inode *new_inode = data;
135 struct inode *old_inode = lock->l_ast_data;
136 LASSERTF(old_inode->i_state & I_FREEING,
137 "Found existing inode %p/%lu/%u state %lu in lock: "
138 "setting data to %p/%lu/%u\n", old_inode,
139 old_inode->i_ino, old_inode->i_generation,
141 new_inode, new_inode->i_ino, new_inode->i_generation);
144 lock->l_ast_data = data;
146 *bits = lock->l_policy_data.l_inodebits.bits;
148 unlock_res_and_lock(lock);
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
159 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165 &res_id, type, policy, mode, lockh, 0);
169 int mdc_cancel_unused(struct obd_export *exp,
170 const struct lu_fid *fid,
171 ldlm_policy_data_t *policy,
172 ldlm_mode_t mode, int flags, void *opaque)
174 struct ldlm_res_id res_id;
175 struct obd_device *obd = class_exp2obd(exp);
180 fid_build_reg_res_name(fid, &res_id);
181 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
182 policy, mode, flags, opaque);
186 int mdc_change_cbdata(struct obd_export *exp,
187 const struct lu_fid *fid,
188 ldlm_iterator_t it, void *data)
190 struct ldlm_res_id res_id;
193 fid_build_reg_res_name(fid, &res_id);
194 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
201 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
203 /* Don't hold error requests for replay. */
204 if (req->rq_replay) {
205 cfs_spin_lock(&req->rq_lock);
207 cfs_spin_unlock(&req->rq_lock);
209 if (rc && req->rq_transno != 0) {
210 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
215 /* Save a large LOV EA into the request buffer so that it is available
216 * for replay. We don't do this in the initial request because the
217 * original request doesn't need this buffer (at most it sends just the
218 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
219 * buffer and may also be difficult to allocate and save a very large
220 * request buffer for each open. (bug 5707)
222 * OOM here may cause recovery failure if lmm is needed (only for the
223 * original open if the MDS crashed just when this client also OOM'd)
224 * but this is incredibly unlikely, and questionable whether the client
225 * could do MDS recovery under OOM anyways... */
226 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
227 struct mdt_body *body)
231 /* FIXME: remove this explicit offset. */
232 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
235 CERROR("Can't enlarge segment %d size to %d\n",
236 DLM_INTENT_REC_OFF + 4, body->eadatasize);
237 body->valid &= ~OBD_MD_FLEASIZE;
238 body->eadatasize = 0;
242 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
243 struct lookup_intent *it,
244 struct md_op_data *op_data,
245 void *lmm, int lmmsize,
248 struct ptlrpc_request *req;
249 struct obd_device *obddev = class_exp2obd(exp);
250 struct ldlm_intent *lit;
251 CFS_LIST_HEAD(cancels);
257 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
259 /* XXX: openlock is not cancelled for cross-refs. */
260 /* If inode is known, cancel conflicting OPEN locks. */
261 if (fid_is_sane(&op_data->op_fid2)) {
262 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
265 else if (it->it_flags & FMODE_EXEC)
270 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
275 /* If CREATE, cancel parent's UPDATE lock. */
276 if (it->it_op & IT_CREAT)
280 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
282 MDS_INODELOCK_UPDATE);
284 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
285 &RQF_LDLM_INTENT_OPEN);
287 ldlm_lock_list_put(&cancels, l_bl_ast, count);
288 RETURN(ERR_PTR(-ENOMEM));
291 /* parent capability */
292 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
293 /* child capability, reserve the size according to parent capa, it will
294 * be filled after we get the reply */
295 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
297 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
298 op_data->op_namelen + 1);
299 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
300 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
302 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
304 ptlrpc_request_free(req);
308 cfs_spin_lock(&req->rq_lock);
309 req->rq_replay = req->rq_import->imp_replayable;
310 cfs_spin_unlock(&req->rq_lock);
312 /* pack the intent */
313 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
314 lit->opc = (__u64)it->it_op;
316 /* pack the intended request */
317 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
320 /* for remote client, fetch remote perm for current user */
321 if (client_is_remote(exp))
322 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
323 sizeof(struct mdt_remote_perm));
324 ptlrpc_request_set_replen(req);
328 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
329 struct lookup_intent *it,
330 struct md_op_data *op_data)
332 struct ptlrpc_request *req;
333 struct obd_device *obddev = class_exp2obd(exp);
334 struct ldlm_intent *lit;
338 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
339 &RQF_LDLM_INTENT_UNLINK);
341 RETURN(ERR_PTR(-ENOMEM));
343 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
344 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
345 op_data->op_namelen + 1);
347 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
349 ptlrpc_request_free(req);
353 /* pack the intent */
354 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
355 lit->opc = (__u64)it->it_op;
357 /* pack the intended request */
358 mdc_unlink_pack(req, op_data);
360 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
361 obddev->u.cli.cl_max_mds_easize);
362 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
363 obddev->u.cli.cl_max_mds_cookiesize);
364 ptlrpc_request_set_replen(req);
368 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
369 struct lookup_intent *it,
370 struct md_op_data *op_data)
372 struct ptlrpc_request *req;
373 struct obd_device *obddev = class_exp2obd(exp);
374 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
375 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
376 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
377 (client_is_remote(exp) ?
378 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
379 struct ldlm_intent *lit;
383 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
384 &RQF_LDLM_INTENT_GETATTR);
386 RETURN(ERR_PTR(-ENOMEM));
388 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
389 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
390 op_data->op_namelen + 1);
392 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
394 ptlrpc_request_free(req);
398 /* pack the intent */
399 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
400 lit->opc = (__u64)it->it_op;
402 /* pack the intended request */
403 mdc_getattr_pack(req, valid, it->it_flags, op_data);
405 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
406 obddev->u.cli.cl_max_mds_easize);
407 if (client_is_remote(exp))
408 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
409 sizeof(struct mdt_remote_perm));
410 ptlrpc_request_set_replen(req);
414 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
416 struct ptlrpc_request *req;
420 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
422 RETURN(ERR_PTR(-ENOMEM));
424 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
426 ptlrpc_request_free(req);
430 ptlrpc_request_set_replen(req);
434 static int mdc_finish_enqueue(struct obd_export *exp,
435 struct ptlrpc_request *req,
436 struct ldlm_enqueue_info *einfo,
437 struct lookup_intent *it,
438 struct lustre_handle *lockh,
441 struct req_capsule *pill = &req->rq_pill;
442 struct ldlm_request *lockreq;
443 struct ldlm_reply *lockrep;
447 /* Similarly, if we're going to replay this request, we don't want to
448 * actually get a lock, just perform the intent. */
449 if (req->rq_transno || req->rq_replay) {
450 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
451 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
454 if (rc == ELDLM_LOCK_ABORTED) {
456 memset(lockh, 0, sizeof(*lockh));
458 } else { /* rc = 0 */
459 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
462 /* If the server gave us back a different lock mode, we should
463 * fix up our variables. */
464 if (lock->l_req_mode != einfo->ei_mode) {
465 ldlm_lock_addref(lockh, lock->l_req_mode);
466 ldlm_lock_decref(lockh, einfo->ei_mode);
467 einfo->ei_mode = lock->l_req_mode;
472 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
473 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
475 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
476 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
477 it->d.lustre.it_lock_mode = einfo->ei_mode;
478 it->d.lustre.it_lock_handle = lockh->cookie;
479 it->d.lustre.it_data = req;
481 if (it->d.lustre.it_status < 0 && req->rq_replay)
482 mdc_clear_replay_flag(req, it->d.lustre.it_status);
484 /* If we're doing an IT_OPEN which did not result in an actual
485 * successful open, then we need to remove the bit which saves
486 * this request for unconditional replay.
488 * It's important that we do this first! Otherwise we might exit the
489 * function without doing so, and try to replay a failed create
491 if (it->it_op & IT_OPEN && req->rq_replay &&
492 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
493 mdc_clear_replay_flag(req, it->d.lustre.it_status);
495 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
496 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
498 /* We know what to expect, so we do any byte flipping required here */
499 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
500 struct mdt_body *body;
502 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
504 CERROR ("Can't swab mdt_body\n");
508 if (it_disposition(it, DISP_OPEN_OPEN) &&
509 !it_open_error(DISP_OPEN_OPEN, it)) {
511 * If this is a successful OPEN request, we need to set
512 * replay handler and data early, so that if replay
513 * happens immediately after swabbing below, new reply
514 * is swabbed by that handler correctly.
516 mdc_set_open_replay_data(NULL, NULL, req);
519 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
522 mdc_update_max_ea_from_body(exp, body);
525 * The eadata is opaque; just check that it is there.
526 * Eventually, obd_unpackmd() will check the contents.
528 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
534 * We save the reply LOV EA in case we have to replay a
535 * create for recovery. If we didn't allocate a large
536 * enough request buffer above we need to reallocate it
537 * here to hold the actual LOV EA.
539 * To not save LOV EA if request is not going to replay
540 * (for example error one).
542 if ((it->it_op & IT_OPEN) && req->rq_replay) {
544 if (req_capsule_get_size(pill, &RMF_EADATA,
547 mdc_realloc_openmsg(req, body);
549 req_capsule_shrink(pill, &RMF_EADATA,
553 req_capsule_set_size(pill, &RMF_EADATA,
557 lmm = req_capsule_client_get(pill, &RMF_EADATA);
559 memcpy(lmm, eadata, body->eadatasize);
563 if (body->valid & OBD_MD_FLRMTPERM) {
564 struct mdt_remote_perm *perm;
566 LASSERT(client_is_remote(exp));
567 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
568 lustre_swab_mdt_remote_perm);
572 if (body->valid & OBD_MD_FLMDSCAPA) {
573 struct lustre_capa *capa, *p;
575 capa = req_capsule_server_get(pill, &RMF_CAPA1);
579 if (it->it_op & IT_OPEN) {
580 /* client fid capa will be checked in replay */
581 p = req_capsule_client_get(pill, &RMF_CAPA2);
586 if (body->valid & OBD_MD_FLOSSCAPA) {
587 struct lustre_capa *capa;
589 capa = req_capsule_server_get(pill, &RMF_CAPA2);
598 /* We always reserve enough space in the reply packet for a stripe MD, because
599 * we don't know in advance the file type. */
600 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
601 struct lookup_intent *it, struct md_op_data *op_data,
602 struct lustre_handle *lockh, void *lmm, int lmmsize,
603 struct ptlrpc_request **reqp, int extra_lock_flags)
605 struct obd_device *obddev = class_exp2obd(exp);
606 struct ptlrpc_request *req = NULL;
607 struct req_capsule *pill;
608 int flags = extra_lock_flags;
610 struct ldlm_res_id res_id;
611 static const ldlm_policy_data_t lookup_policy =
612 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
613 static const ldlm_policy_data_t update_policy =
614 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
615 ldlm_policy_data_t const *policy = &lookup_policy;
618 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
621 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
624 flags |= LDLM_FL_HAS_INTENT;
625 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
626 policy = &update_policy;
632 /* The only way right now is FLOCK, in this case we hide flock
633 policy as lmm, but lmmsize is 0 */
634 LASSERT(lmm && lmmsize == 0);
635 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
637 policy = (ldlm_policy_data_t *)lmm;
638 res_id.name[3] = LDLM_FLOCK;
639 } else if (it->it_op & IT_OPEN) {
640 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
642 policy = &update_policy;
643 einfo->ei_cbdata = NULL;
645 } else if (it->it_op & IT_UNLINK)
646 req = mdc_intent_unlink_pack(exp, it, op_data);
647 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
648 req = mdc_intent_getattr_pack(exp, it, op_data);
649 else if (it->it_op == IT_READDIR)
650 req = ldlm_enqueue_pack(exp);
657 RETURN(PTR_ERR(req));
658 pill = &req->rq_pill;
660 /* It is important to obtain rpc_lock first (if applicable), so that
661 * threads that are serialised with rpc_lock are not polluting our
662 * rpcs in flight counter. We do not do flock request limiting, though*/
664 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
665 mdc_enter_request(&obddev->u.cli);
668 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
674 mdc_exit_request(&obddev->u.cli);
675 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
678 /* For flock requests we immediatelly return without further
679 delay and let caller deal with the rest, since rest of
680 this function metadata processing makes no sense for flock
686 CERROR("ldlm_cli_enqueue: %d\n", rc);
687 mdc_clear_replay_flag(req, rc);
688 ptlrpc_req_finished(req);
691 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
696 static int mdc_finish_intent_lock(struct obd_export *exp,
697 struct ptlrpc_request *request,
698 struct md_op_data *op_data,
699 struct lookup_intent *it,
700 struct lustre_handle *lockh)
702 struct lustre_handle old_lock;
703 struct mdt_body *mdt_body;
704 struct ldlm_lock *lock;
708 LASSERT(request != NULL);
709 LASSERT(request != LP_POISON);
710 LASSERT(request->rq_repmsg != LP_POISON);
712 if (!it_disposition(it, DISP_IT_EXECD)) {
713 /* The server failed before it even started executing the
714 * intent, i.e. because it couldn't unpack the request. */
715 LASSERT(it->d.lustre.it_status != 0);
716 RETURN(it->d.lustre.it_status);
718 rc = it_open_error(DISP_IT_EXECD, it);
722 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
723 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
725 /* If we were revalidating a fid/name pair, mark the intent in
726 * case we fail and get called again from lookup */
727 if (fid_is_sane(&op_data->op_fid2) &&
728 it->it_create_mode & M_CHECK_STALE &&
729 it->it_op != IT_GETATTR) {
730 it_set_disposition(it, DISP_ENQ_COMPLETE);
732 /* Also: did we find the same inode? */
733 /* sever can return one of two fids:
734 * op_fid2 - new allocated fid - if file is created.
735 * op_fid3 - existent fid - if file only open.
736 * op_fid3 is saved in lmv_intent_open */
737 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
738 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
739 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
740 "\n", PFID(&op_data->op_fid2),
741 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
746 rc = it_open_error(DISP_LOOKUP_EXECD, it);
750 /* keep requests around for the multiple phases of the call
751 * this shows the DISP_XX must guarantee we make it into the call
753 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
754 it_disposition(it, DISP_OPEN_CREATE) &&
755 !it_open_error(DISP_OPEN_CREATE, it)) {
756 it_set_disposition(it, DISP_ENQ_CREATE_REF);
757 ptlrpc_request_addref(request); /* balanced in ll_create_node */
759 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
760 it_disposition(it, DISP_OPEN_OPEN) &&
761 !it_open_error(DISP_OPEN_OPEN, it)) {
762 it_set_disposition(it, DISP_ENQ_OPEN_REF);
763 ptlrpc_request_addref(request); /* balanced in ll_file_open */
764 /* BUG 11546 - eviction in the middle of open rpc processing */
765 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
768 if (it->it_op & IT_CREAT) {
769 /* XXX this belongs in ll_create_it */
770 } else if (it->it_op == IT_OPEN) {
771 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
773 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
776 /* If we already have a matching lock, then cancel the new
777 * one. We have to set the data here instead of in
778 * mdc_enqueue, because we need to use the child's inode as
779 * the l_ast_data to match, and that's not available until
780 * intent_finish has performed the iget().) */
781 lock = ldlm_handle2lock(lockh);
783 ldlm_policy_data_t policy = lock->l_policy_data;
784 LDLM_DEBUG(lock, "matching against this");
786 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
787 &lock->l_resource->lr_name),
788 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
789 (unsigned long)lock->l_resource->lr_name.name[0],
790 (unsigned long)lock->l_resource->lr_name.name[1],
791 (unsigned long)lock->l_resource->lr_name.name[2],
792 (unsigned long)fid_seq(&mdt_body->fid1),
793 (unsigned long)fid_oid(&mdt_body->fid1),
794 (unsigned long)fid_ver(&mdt_body->fid1));
797 memcpy(&old_lock, lockh, sizeof(*lockh));
798 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
799 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
800 ldlm_lock_decref_and_cancel(lockh,
801 it->d.lustre.it_lock_mode);
802 memcpy(lockh, &old_lock, sizeof(old_lock));
803 it->d.lustre.it_lock_handle = lockh->cookie;
806 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
807 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
808 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
813 * This long block is all about fixing up the lock and request state
814 * so that it is correct as of the moment _before_ the operation was
815 * applied; that way, the VFS will think that everything is normal and
816 * call Lustre's regular VFS methods.
818 * If we're performing a creation, that means that unless the creation
819 * failed with EEXIST, we should fake up a negative dentry.
821 * For everything else, we want to lookup to succeed.
823 * One additional note: if CREATE or OPEN succeeded, we add an extra
824 * reference to the request because we need to keep it around until
825 * ll_create/ll_open gets called.
827 * The server will return to us, in it_disposition, an indication of
828 * exactly what d.lustre.it_status refers to.
830 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
831 * otherwise if DISP_OPEN_CREATE is set, then it status is the
832 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
833 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
836 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
839 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
840 void *lmm, int lmmsize, struct lookup_intent *it,
841 int lookup_flags, struct ptlrpc_request **reqp,
842 ldlm_blocking_callback cb_blocking,
843 int extra_lock_flags)
845 struct lustre_handle lockh;
850 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
851 ", intent: %s flags %#o\n", op_data->op_namelen,
852 op_data->op_name, PFID(&op_data->op_fid2),
853 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
857 if (fid_is_sane(&op_data->op_fid2) &&
858 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
859 /* We could just return 1 immediately, but since we should only
860 * be called in revalidate_it if we already have a lock, let's
862 ldlm_policy_data_t policy;
865 /* As not all attributes are kept under update lock, e.g.
866 owner/group/acls are under lookup lock, we need both
867 ibits for GETATTR. */
869 /* For CMD, UPDATE lock and LOOKUP lock can not be got
870 * at the same for cross-object, so we can not match
871 * the 2 lock at the same time FIXME: but how to handle
872 * the above situation */
873 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
874 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
876 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
877 &op_data->op_fid2, LDLM_IBITS, &policy,
878 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
880 it->d.lustre.it_lock_handle = lockh.cookie;
881 it->d.lustre.it_lock_mode = mode;
884 /* Only return failure if it was not GETATTR by cfid
885 (from inode_revalidate) */
886 if (mode || op_data->op_namelen != 0)
890 /* lookup_it may be called only after revalidate_it has run, because
891 * revalidate_it cannot return errors, only zero. Returning zero causes
892 * this call to lookup, which *can* return an error.
894 * We only want to execute the request associated with the intent one
895 * time, however, so don't send the request again. Instead, skip past
896 * this and use the request from revalidate. In this case, revalidate
897 * never dropped its reference, so the refcounts are all OK */
898 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
899 struct ldlm_enqueue_info einfo =
900 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
901 ldlm_completion_ast, NULL, NULL, NULL };
903 /* For case if upper layer did not alloc fid, do it now. */
904 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
905 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
907 CERROR("Can't alloc new fid, rc %d\n", rc);
911 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
912 lmm, lmmsize, NULL, extra_lock_flags);
915 } else if (!fid_is_sane(&op_data->op_fid2) ||
916 !(it->it_create_mode & M_CHECK_STALE)) {
917 /* DISP_ENQ_COMPLETE set means there is extra reference on
918 * request referenced from this intent, saved for subsequent
919 * lookup. This path is executed when we proceed to this
920 * lookup, so we clear DISP_ENQ_COMPLETE */
921 it_clear_disposition(it, DISP_ENQ_COMPLETE);
923 *reqp = it->d.lustre.it_data;
924 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
928 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
929 struct ptlrpc_request *req,
930 void *unused, int rc)
932 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
933 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
934 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
935 struct lookup_intent *it;
936 struct lustre_handle *lockh;
937 struct obd_device *obddev;
938 int flags = LDLM_FL_HAS_INTENT;
942 lockh = &minfo->mi_lockh;
944 obddev = class_exp2obd(exp);
946 mdc_exit_request(&obddev->u.cli);
947 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
950 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
951 &flags, NULL, 0, lockh, rc);
953 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
954 mdc_clear_replay_flag(req, rc);
958 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
962 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
967 minfo->mi_cb(req, minfo, rc);
971 int mdc_intent_getattr_async(struct obd_export *exp,
972 struct md_enqueue_info *minfo,
973 struct ldlm_enqueue_info *einfo)
975 struct md_op_data *op_data = &minfo->mi_data;
976 struct lookup_intent *it = &minfo->mi_it;
977 struct ptlrpc_request *req;
978 struct obd_device *obddev = class_exp2obd(exp);
979 struct ldlm_res_id res_id;
980 ldlm_policy_data_t policy = {
981 .l_inodebits = { MDS_INODELOCK_LOOKUP }
984 int flags = LDLM_FL_HAS_INTENT;
987 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
988 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
989 ldlm_it2str(it->it_op), it->it_flags);
991 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
992 req = mdc_intent_getattr_pack(exp, it, op_data);
996 mdc_enter_request(&obddev->u.cli);
997 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
998 0, &minfo->mi_lockh, 1);
1000 mdc_exit_request(&obddev->u.cli);
1004 req->rq_async_args.pointer_arg[0] = exp;
1005 req->rq_async_args.pointer_arg[1] = minfo;
1006 req->rq_async_args.pointer_arg[2] = einfo;
1007 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1008 ptlrpcd_add_req(req, PSCOPE_OTHER);
1013 int mdc_revalidate_lock(struct obd_export *exp,
1014 struct lookup_intent *it,
1017 /* We could just return 1 immediately, but since we should only
1018 * be called in revalidate_it if we already have a lock, let's
1020 struct ldlm_res_id res_id;
1021 struct lustre_handle lockh;
1022 ldlm_policy_data_t policy;
1026 fid_build_reg_res_name(fid, &res_id);
1027 /* As not all attributes are kept under update lock, e.g.
1028 owner/group/acls are under lookup lock, we need both
1029 ibits for GETATTR. */
1030 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1031 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
1033 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1034 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1035 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1037 it->d.lustre.it_lock_handle = lockh.cookie;
1038 it->d.lustre.it_lock_mode = mode;