1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDC
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
36 # include <liblustre.h>
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
44 int it_disposition(struct lookup_intent *it, int flag)
46 return it->d.lustre.it_disposition & flag;
48 EXPORT_SYMBOL(it_disposition);
50 void it_set_disposition(struct lookup_intent *it, int flag)
52 it->d.lustre.it_disposition |= flag;
54 EXPORT_SYMBOL(it_set_disposition);
56 void it_clear_disposition(struct lookup_intent *it, int flag)
58 it->d.lustre.it_disposition &= ~flag;
60 EXPORT_SYMBOL(it_clear_disposition);
62 static int it_to_lock_mode(struct lookup_intent *it)
64 /* CREAT needs to be tested before open (both could be set) */
65 if (it->it_op & IT_CREAT)
67 else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
74 int it_open_error(int phase, struct lookup_intent *it)
76 if (it_disposition(it, DISP_OPEN_OPEN)) {
77 if (phase >= DISP_OPEN_OPEN)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_OPEN_CREATE)) {
84 if (phase >= DISP_OPEN_CREATE)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91 if (phase >= DISP_LOOKUP_EXECD)
92 return it->d.lustre.it_status;
97 if (it_disposition(it, DISP_IT_EXECD)) {
98 if (phase >= DISP_IT_EXECD)
99 return it->d.lustre.it_status;
103 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104 it->d.lustre.it_status);
108 EXPORT_SYMBOL(it_open_error);
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
113 struct ldlm_lock *lock;
114 struct lustre_handle *lockh = (struct lustre_handle *)l;
122 lock = ldlm_handle2lock(lockh);
124 LASSERT(lock != NULL);
125 lock_res_and_lock(lock);
127 if (lock->l_ast_data && lock->l_ast_data != data) {
128 struct inode *new_inode = data;
129 struct inode *old_inode = lock->l_ast_data;
130 LASSERTF(old_inode->i_state & I_FREEING,
131 "Found existing inode %p/%lu/%u state %lu in lock: "
132 "setting data to %p/%lu/%u\n", old_inode,
133 old_inode->i_ino, old_inode->i_generation,
135 new_inode, new_inode->i_ino, new_inode->i_generation);
138 lock->l_ast_data = data;
139 unlock_res_and_lock(lock);
144 EXPORT_SYMBOL(mdc_set_lock_data);
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
147 ldlm_iterator_t it, void *data)
149 struct ldlm_res_id res_id = { .name = {0} };
152 res_id.name[0] = fid->id;
153 res_id.name[1] = fid->generation;
155 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
162 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
164 /* Don't hold error requests for replay. */
165 if (req->rq_replay) {
166 spin_lock(&req->rq_lock);
168 spin_unlock(&req->rq_lock);
170 if (rc && req->rq_transno != 0) {
171 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
176 static int round_up(int val)
186 /* Save a large LOV EA into the request buffer so that it is available
187 * for replay. We don't do this in the initial request because the
188 * original request doesn't need this buffer (at most it sends just the
189 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
190 * buffer and may also be difficult to allocate and save a very large
191 * request buffer for each open. (bug 5707)
193 * OOM here may cause recovery failure if lmm is needed (only for the
194 * original open if the MDS crashed just when this client also OOM'd)
195 * but this is incredibly unlikely, and questionable whether the client
196 * could do MDS recovery under OOM anyways... */
197 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
198 struct mds_body *body, int size[6])
200 int new_size, old_size;
201 struct lustre_msg *new_msg;
204 old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
206 size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
207 new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
208 OBD_ALLOC(new_msg, new_size);
209 if (new_msg != NULL) {
210 struct lustre_msg *old_msg = req->rq_reqmsg;
212 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
214 memcpy(new_msg, old_msg, old_size);
215 lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
218 spin_lock(&req->rq_lock);
219 req->rq_reqmsg = new_msg;
220 req->rq_reqlen = new_size;
221 spin_unlock(&req->rq_lock);
223 OBD_FREE(old_msg, old_size);
225 body->valid &= ~OBD_MD_FLEASIZE;
226 body->eadatasize = 0;
230 /* We always reserve enough space in the reply packet for a stripe MD, because
231 * we don't know in advance the file type. */
232 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
233 struct lookup_intent *it, struct mdc_op_data *op_data,
234 struct lustre_handle *lockh, void *lmm, int lmmsize,
235 int extra_lock_flags)
237 struct ptlrpc_request *req;
238 struct obd_device *obddev = class_exp2obd(exp);
239 struct ldlm_res_id res_id =
240 { .name = {op_data->fid1.id, op_data->fid1.generation} };
241 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
242 struct ldlm_request *lockreq;
243 struct ldlm_intent *lit;
244 struct ldlm_reply *lockrep;
245 int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
246 [DLM_LOCKREQ_OFF] = sizeof(*lockreq),
247 [DLM_INTENT_IT_OFF] = sizeof(*lit),
249 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
250 [DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
251 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
252 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
253 cl_max_mds_easize, 0 };
254 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
255 int repbufcnt = 4, rc;
259 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
260 // LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
261 // ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
263 if (it->it_op & IT_OPEN) {
264 CFS_LIST_HEAD(cancels);
268 it->it_create_mode |= S_IFREG;
270 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
271 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
272 /* As an optimization, we allocate an RPC request buffer for
273 * at least a default-sized LOV EA even if we aren't sending
274 * one. We grow the whole request to the next power-of-two
275 * size since we get that much from a slab allocation anyways.
276 * This avoids an allocation below in the common case where
277 * we need to save a default-sized LOV EA for open replay. */
278 size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
279 obddev->u.cli.cl_default_mds_easize);
280 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
283 size[DLM_INTENT_REC_OFF + 2] =
284 min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
285 obddev->u.cli.cl_max_mds_easize);
287 /* If inode is known, cancel conflicting OPEN locks. */
288 if (op_data->fid2.id) {
289 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
292 else if (it->it_flags & FMODE_EXEC)
297 count = mdc_resource_get_unused(exp, &op_data->fid2,
302 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
303 if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
307 count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
308 mode, MDS_INODELOCK_UPDATE);
309 if (it->it_flags & O_JOIN_FILE) {
310 /* join is like an unlink of the tail */
311 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
312 size[DLM_INTENT_REC_OFF + 3] =
313 sizeof(struct mds_rec_join);
314 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
316 mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
317 (*(__u64 *)op_data->data));
319 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
326 spin_lock(&req->rq_lock);
328 spin_unlock(&req->rq_lock);
330 /* pack the intent */
331 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
333 lit->opc = (__u64)it->it_op;
335 /* pack the intended request */
336 mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
337 it->it_create_mode, 0, it->it_flags,
340 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
341 } else if (it->it_op & IT_UNLINK) {
342 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
343 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
344 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
345 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
349 /* pack the intent */
350 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
352 lit->opc = (__u64)it->it_op;
354 /* pack the intended request */
355 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
357 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
358 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
359 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
360 OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
362 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
363 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
365 if (it->it_op & IT_GETATTR)
366 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
368 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
372 /* pack the intent */
373 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
375 lit->opc = (__u64)it->it_op;
377 /* pack the intended request */
378 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
379 it->it_flags, op_data);
381 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
382 } else if (it->it_op == IT_READDIR) {
383 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
384 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
394 /* get ready for the reply */
395 ptlrpc_req_set_repsize(req, repbufcnt, repsize);
397 /* It is important to obtain rpc_lock first (if applicable), so that
398 * threads that are serialised with rpc_lock are not polluting our
399 * rpcs in flight counter */
400 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
401 mdc_enter_request(&obddev->u.cli);
402 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
404 mdc_exit_request(&obddev->u.cli);
405 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
407 /* Similarly, if we're going to replay this request, we don't want to
408 * actually get a lock, just perform the intent. */
409 if (req->rq_transno || req->rq_replay) {
410 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
412 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
415 /* This can go when we're sure that this can never happen */
416 LASSERT(rc != -ENOENT);
417 if (rc == ELDLM_LOCK_ABORTED) {
419 memset(lockh, 0, sizeof(*lockh));
421 } else if (rc != 0) {
422 CERROR("ldlm_cli_enqueue: %d\n", rc);
423 LASSERTF(rc < 0, "rc %d\n", rc);
424 mdc_clear_replay_flag(req, rc);
425 ptlrpc_req_finished(req);
427 } else { /* rc = 0 */
428 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
431 /* If the server gave us back a different lock mode, we should
432 * fix up our variables. */
433 if (lock->l_req_mode != einfo->ei_mode) {
434 ldlm_lock_addref(lockh, lock->l_req_mode);
435 ldlm_lock_decref(lockh, einfo->ei_mode);
436 einfo->ei_mode = lock->l_req_mode;
441 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
443 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
444 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */
446 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
447 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
448 it->d.lustre.it_lock_mode = einfo->ei_mode;
449 it->d.lustre.it_data = req;
451 if (it->d.lustre.it_status < 0 && req->rq_replay)
452 mdc_clear_replay_flag(req, it->d.lustre.it_status);
454 /* If we're doing an IT_OPEN which did not result in an actual
455 * successful open, then we need to remove the bit which saves
456 * this request for unconditional replay.
458 * It's important that we do this first! Otherwise we might exit the
459 * function without doing so, and try to replay a failed create
461 if (it->it_op & IT_OPEN && req->rq_replay &&
462 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
463 mdc_clear_replay_flag(req, it->d.lustre.it_status);
465 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
466 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
468 /* We know what to expect, so we do any byte flipping required here */
469 LASSERT(repbufcnt == 5 || repbufcnt == 2);
470 if (repbufcnt == 5) {
471 struct mds_body *body;
473 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
474 lustre_swab_mds_body);
476 CERROR ("Can't swab mds_body\n");
480 /* If this is a successful OPEN request, we need to set
481 replay handler and data early, so that if replay happens
482 immediately after swabbing below, new reply is swabbed
483 by that handler correctly */
484 if (it_disposition(it, DISP_OPEN_OPEN) &&
485 !it_open_error(DISP_OPEN_OPEN, it))
486 mdc_set_open_replay_data(NULL, req);
488 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
489 /* The eadata is opaque; just check that it is there.
490 * Eventually, obd_unpackmd() will check the contents */
491 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
492 body->eadatasize, NULL);
493 if (eadata == NULL) {
494 CERROR ("Missing/short eadata\n");
497 if (body->valid & OBD_MD_FLMODEASIZE) {
498 if (obddev->u.cli.cl_max_mds_easize <
500 obddev->u.cli.cl_max_mds_easize =
502 CDEBUG(D_INFO, "maxeasize become %d\n",
505 if (obddev->u.cli.cl_max_mds_cookiesize <
506 body->max_cookiesize) {
507 obddev->u.cli.cl_max_mds_cookiesize =
508 body->max_cookiesize;
509 CDEBUG(D_INFO, "cookiesize become %d\n",
510 body->max_cookiesize);
513 /* We save the reply LOV EA in case we have to replay
514 * a create for recovery. If we didn't allocate a
515 * large enough request buffer above we need to
516 * reallocate it here to hold the actual LOV EA. */
517 if (it->it_op & IT_OPEN) {
518 int offset = DLM_INTENT_REC_OFF + 2;
520 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
522 mdc_realloc_openmsg(req, body, size);
524 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
527 memcpy(lmm, eadata, body->eadatasize);
534 EXPORT_SYMBOL(mdc_enqueue);
537 * This long block is all about fixing up the lock and request state
538 * so that it is correct as of the moment _before_ the operation was
539 * applied; that way, the VFS will think that everything is normal and
540 * call Lustre's regular VFS methods.
542 * If we're performing a creation, that means that unless the creation
543 * failed with EEXIST, we should fake up a negative dentry.
545 * For everything else, we want to lookup to succeed.
547 * One additional note: if CREATE or OPEN succeeded, we add an extra
548 * reference to the request because we need to keep it around until
549 * ll_create/ll_open gets called.
551 * The server will return to us, in it_disposition, an indication of
552 * exactly what d.lustre.it_status refers to.
554 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
555 * otherwise if DISP_OPEN_CREATE is set, then it status is the
556 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
557 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
560 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
563 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
564 void *lmm, int lmmsize, struct lookup_intent *it,
565 int lookup_flags, struct ptlrpc_request **reqp,
566 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
568 struct lustre_handle lockh;
569 struct ptlrpc_request *request;
571 struct mds_body *mds_body;
572 struct lustre_handle old_lock;
573 struct ldlm_lock *lock;
577 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
578 op_data->namelen, op_data->name, op_data->fid1.id,
579 ldlm_it2str(it->it_op), it->it_flags);
581 if (op_data->fid2.id &&
582 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
583 /* We could just return 1 immediately, but since we should only
584 * be called in revalidate_it if we already have a lock, let's
586 struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
587 op_data->fid2.generation}};
588 struct lustre_handle lockh;
589 ldlm_policy_data_t policy;
592 /* As not all attributes are kept under update lock, e.g.
593 owner/group/acls are under lookup lock, we need both
594 ibits for GETATTR. */
595 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
596 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
597 MDS_INODELOCK_LOOKUP;
599 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
600 LDLM_FL_BLOCK_GRANTED, &res_id,
601 LDLM_IBITS, &policy, LCK_CR, &lockh);
604 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
605 LDLM_FL_BLOCK_GRANTED, &res_id,
606 LDLM_IBITS, &policy,LCK_CW,&lockh);
610 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
611 LDLM_FL_BLOCK_GRANTED, &res_id,
612 LDLM_IBITS, &policy,LCK_PR,&lockh);
615 memcpy(&it->d.lustre.it_lock_handle, &lockh,
617 it->d.lustre.it_lock_mode = mode;
620 /* Only return failure if it was not GETATTR by cfid
621 (from inode_revalidate) */
622 if (rc || op_data->namelen != 0)
626 /* lookup_it may be called only after revalidate_it has run, because
627 * revalidate_it cannot return errors, only zero. Returning zero causes
628 * this call to lookup, which *can* return an error.
630 * We only want to execute the request associated with the intent one
631 * time, however, so don't send the request again. Instead, skip past
632 * this and use the request from revalidate. In this case, revalidate
633 * never dropped its reference, so the refcounts are all OK */
634 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
635 struct ldlm_enqueue_info einfo =
636 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
637 ldlm_completion_ast, NULL, NULL };
639 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
640 lmm, lmmsize, extra_lock_flags);
643 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
644 } else if (!op_data->fid2.id) {
645 /* DISP_ENQ_COMPLETE set means there is extra reference on
646 * request referenced from this intent, saved for subsequent
647 * lookup. This path is executed when we proceed to this
648 * lookup, so we clear DISP_ENQ_COMPLETE */
649 it_clear_disposition(it, DISP_ENQ_COMPLETE);
651 request = *reqp = it->d.lustre.it_data;
652 LASSERT(request != NULL);
653 LASSERT(request != LP_POISON);
654 LASSERT(request->rq_repmsg != LP_POISON);
656 if (!it_disposition(it, DISP_IT_EXECD)) {
657 /* The server failed before it even started executing the
658 * intent, i.e. because it couldn't unpack the request. */
659 LASSERT(it->d.lustre.it_status != 0);
660 RETURN(it->d.lustre.it_status);
662 rc = it_open_error(DISP_IT_EXECD, it);
666 mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
668 LASSERT(mds_body != NULL); /* mdc_enqueue checked */
669 LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
671 /* If we were revalidating a fid/name pair, mark the intent in
672 * case we fail and get called again from lookup */
673 if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
674 it_set_disposition(it, DISP_ENQ_COMPLETE);
675 /* Also: did we find the same inode? */
676 if (memcmp(&op_data->fid2, &mds_body->fid1,
677 sizeof(op_data->fid2)))
681 rc = it_open_error(DISP_LOOKUP_EXECD, it);
685 /* keep requests around for the multiple phases of the call
686 * this shows the DISP_XX must guarantee we make it into the call
688 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
689 it_disposition(it, DISP_OPEN_CREATE) &&
690 !it_open_error(DISP_OPEN_CREATE, it)) {
691 it_set_disposition(it, DISP_ENQ_CREATE_REF);
692 ptlrpc_request_addref(request); /* balanced in ll_create_node */
694 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
695 it_disposition(it, DISP_OPEN_OPEN) &&
696 !it_open_error(DISP_OPEN_OPEN, it)) {
697 it_set_disposition(it, DISP_ENQ_OPEN_REF);
698 ptlrpc_request_addref(request); /* balanced in ll_file_open */
699 /* BUG 11546 - eviction in the middle of open rpc processing */
700 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
703 if (it->it_op & IT_CREAT) {
704 /* XXX this belongs in ll_create_it */
705 } else if (it->it_op == IT_OPEN) {
706 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
708 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
711 /* If we already have a matching lock, then cancel the new
712 * one. We have to set the data here instead of in
713 * mdc_enqueue, because we need to use the child's inode as
714 * the l_ast_data to match, and that's not available until
715 * intent_finish has performed the iget().) */
716 lock = ldlm_handle2lock(&lockh);
718 ldlm_policy_data_t policy = lock->l_policy_data;
719 LDLM_DEBUG(lock, "matching against this");
721 memcpy(&old_lock, &lockh, sizeof(lockh));
722 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
723 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
724 ldlm_lock_decref_and_cancel(&lockh,
725 it->d.lustre.it_lock_mode);
726 memcpy(&lockh, &old_lock, sizeof(old_lock));
727 memcpy(&it->d.lustre.it_lock_handle, &lockh,
731 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
732 op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
733 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
737 EXPORT_SYMBOL(mdc_intent_lock);