1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDC
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
36 # include <liblustre.h>
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
44 int it_disposition(struct lookup_intent *it, int flag)
46 return it->d.lustre.it_disposition & flag;
48 EXPORT_SYMBOL(it_disposition);
50 void it_set_disposition(struct lookup_intent *it, int flag)
52 it->d.lustre.it_disposition |= flag;
54 EXPORT_SYMBOL(it_set_disposition);
56 void it_clear_disposition(struct lookup_intent *it, int flag)
58 it->d.lustre.it_disposition &= ~flag;
60 EXPORT_SYMBOL(it_clear_disposition);
62 static int it_to_lock_mode(struct lookup_intent *it)
64 /* CREAT needs to be tested before open (both could be set) */
65 if (it->it_op & IT_CREAT)
67 else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
74 int it_open_error(int phase, struct lookup_intent *it)
76 if (it_disposition(it, DISP_OPEN_OPEN)) {
77 if (phase >= DISP_OPEN_OPEN)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_OPEN_CREATE)) {
84 if (phase >= DISP_OPEN_CREATE)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91 if (phase >= DISP_LOOKUP_EXECD)
92 return it->d.lustre.it_status;
97 if (it_disposition(it, DISP_IT_EXECD)) {
98 if (phase >= DISP_IT_EXECD)
99 return it->d.lustre.it_status;
103 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104 it->d.lustre.it_status);
108 EXPORT_SYMBOL(it_open_error);
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
113 struct ldlm_lock *lock;
114 struct lustre_handle *lockh = (struct lustre_handle *)l;
122 lock = ldlm_handle2lock(lockh);
124 LASSERT(lock != NULL);
125 lock_res_and_lock(lock);
127 if (lock->l_ast_data && lock->l_ast_data != data) {
128 struct inode *new_inode = data;
129 struct inode *old_inode = lock->l_ast_data;
130 LASSERTF(old_inode->i_state & I_FREEING,
131 "Found existing inode %p/%lu/%u state %lu in lock: "
132 "setting data to %p/%lu/%u\n", old_inode,
133 old_inode->i_ino, old_inode->i_generation,
135 new_inode, new_inode->i_ino, new_inode->i_generation);
138 lock->l_ast_data = data;
139 unlock_res_and_lock(lock);
144 EXPORT_SYMBOL(mdc_set_lock_data);
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
147 ldlm_iterator_t it, void *data)
149 struct ldlm_res_id res_id = { .name = {0} };
152 res_id.name[0] = fid->id;
153 res_id.name[1] = fid->generation;
155 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
162 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
164 /* Don't hold error requests for replay. */
165 if (req->rq_replay) {
166 spin_lock(&req->rq_lock);
168 spin_unlock(&req->rq_lock);
170 if (rc && req->rq_transno != 0) {
171 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
176 static int round_up(int val)
186 /* Save a large LOV EA into the request buffer so that it is available
187 * for replay. We don't do this in the initial request because the
188 * original request doesn't need this buffer (at most it sends just the
189 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
190 * buffer and may also be difficult to allocate and save a very large
191 * request buffer for each open. (bug 5707)
193 * OOM here may cause recovery failure if lmm is needed (only for the
194 * original open if the MDS crashed just when this client also OOM'd)
195 * but this is incredibly unlikely, and questionable whether the client
196 * could do MDS recovery under OOM anyways... */
197 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
198 struct mds_body *body, int size[6])
200 int new_size, old_size;
201 struct lustre_msg *new_msg;
204 old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
206 size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
207 new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
208 OBD_ALLOC(new_msg, new_size);
209 if (new_msg != NULL) {
210 struct lustre_msg *old_msg = req->rq_reqmsg;
212 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
214 memcpy(new_msg, old_msg, old_size);
215 lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
218 spin_lock(&req->rq_lock);
219 req->rq_reqmsg = new_msg;
220 req->rq_reqlen = new_size;
221 spin_unlock(&req->rq_lock);
223 OBD_FREE(old_msg, old_size);
225 body->valid &= ~OBD_MD_FLEASIZE;
226 body->eadatasize = 0;
230 /* We always reserve enough space in the reply packet for a stripe MD, because
231 * we don't know in advance the file type. */
232 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
233 struct lookup_intent *it, struct mdc_op_data *op_data,
234 struct lustre_handle *lockh, void *lmm, int lmmsize,
235 int extra_lock_flags)
237 struct ptlrpc_request *req;
238 struct obd_device *obddev = class_exp2obd(exp);
239 struct ldlm_res_id res_id =
240 { .name = {op_data->fid1.id, op_data->fid1.generation} };
241 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
242 struct ldlm_request *lockreq;
243 struct ldlm_intent *lit;
244 struct ldlm_reply *lockrep;
245 int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
246 [DLM_LOCKREQ_OFF] = sizeof(*lockreq),
247 [DLM_INTENT_IT_OFF] = sizeof(*lit),
249 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
250 [DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
251 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
252 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
253 cl_max_mds_easize, 0 };
254 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
255 int repbufcnt = 4, rc;
259 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
260 // LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
261 // ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
263 if (it->it_op & IT_OPEN) {
264 CFS_LIST_HEAD(cancels);
268 it->it_create_mode |= S_IFREG;
270 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
271 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
272 /* As an optimization, we allocate an RPC request buffer for
273 * at least a default-sized LOV EA even if we aren't sending
274 * one. We grow the whole request to the next power-of-two
275 * size since we get that much from a slab allocation anyways.
276 * This avoids an allocation below in the common case where
277 * we need to save a default-sized LOV EA for open replay. */
278 size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
279 obddev->u.cli.cl_default_mds_easize);
280 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
283 size[DLM_INTENT_REC_OFF + 2] =
284 min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
285 obddev->u.cli.cl_max_mds_easize);
287 /* If inode is known, cancel conflicting OPEN locks. */
288 if (op_data->fid2.id) {
289 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
292 else if (it->it_flags & FMODE_EXEC)
297 count = mdc_resource_get_unused(exp, &op_data->fid2,
302 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
303 if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
307 count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
308 mode, MDS_INODELOCK_UPDATE);
309 if (it->it_flags & O_JOIN_FILE) {
310 /* join is like an unlink of the tail */
311 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
312 size[DLM_INTENT_REC_OFF + 3] =
313 sizeof(struct mds_rec_join);
314 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
316 mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
317 (*(__u64 *)op_data->data));
319 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
326 spin_lock(&req->rq_lock);
328 spin_unlock(&req->rq_lock);
330 /* pack the intent */
331 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
333 lit->opc = (__u64)it->it_op;
335 /* pack the intended request */
336 mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
337 it->it_create_mode, 0, it->it_flags,
340 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
341 } else if (it->it_op & IT_UNLINK) {
342 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
343 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
344 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
345 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
349 /* pack the intent */
350 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
352 lit->opc = (__u64)it->it_op;
354 /* pack the intended request */
355 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
357 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
358 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
359 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
360 OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
362 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
363 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
365 if (it->it_op & IT_GETATTR)
366 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
368 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
372 /* pack the intent */
373 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
375 lit->opc = (__u64)it->it_op;
377 /* pack the intended request */
378 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
379 it->it_flags, op_data);
381 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
382 } else if (it->it_op == IT_READDIR) {
383 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
384 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
394 /* get ready for the reply */
395 ptlrpc_req_set_repsize(req, repbufcnt, repsize);
397 /* It is important to obtain rpc_lock first (if applicable), so that
398 * threads that are serialised with rpc_lock are not polluting our
399 * rpcs in flight counter */
400 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
401 mdc_enter_request(&obddev->u.cli);
402 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
404 mdc_exit_request(&obddev->u.cli);
405 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
407 /* Similarly, if we're going to replay this request, we don't want to
408 * actually get a lock, just perform the intent. */
409 if (req->rq_transno || req->rq_replay) {
410 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
412 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
415 if (rc == ELDLM_LOCK_ABORTED) {
417 memset(lockh, 0, sizeof(*lockh));
419 } else if (rc != 0) {
420 CERROR("ldlm_cli_enqueue: %d\n", rc);
421 LASSERTF(rc < 0, "rc %d\n", rc);
422 mdc_clear_replay_flag(req, rc);
423 ptlrpc_req_finished(req);
425 } else { /* rc = 0 */
426 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
429 /* If the server gave us back a different lock mode, we should
430 * fix up our variables. */
431 if (lock->l_req_mode != einfo->ei_mode) {
432 ldlm_lock_addref(lockh, lock->l_req_mode);
433 ldlm_lock_decref(lockh, einfo->ei_mode);
434 einfo->ei_mode = lock->l_req_mode;
439 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
441 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
442 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */
444 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
445 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
446 it->d.lustre.it_lock_mode = einfo->ei_mode;
447 it->d.lustre.it_data = req;
449 if (it->d.lustre.it_status < 0 && req->rq_replay)
450 mdc_clear_replay_flag(req, it->d.lustre.it_status);
452 /* If we're doing an IT_OPEN which did not result in an actual
453 * successful open, then we need to remove the bit which saves
454 * this request for unconditional replay.
456 * It's important that we do this first! Otherwise we might exit the
457 * function without doing so, and try to replay a failed create
459 if (it->it_op & IT_OPEN && req->rq_replay &&
460 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
461 mdc_clear_replay_flag(req, it->d.lustre.it_status);
463 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
464 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
466 /* We know what to expect, so we do any byte flipping required here */
467 LASSERT(repbufcnt == 5 || repbufcnt == 2);
468 if (repbufcnt == 5) {
469 struct mds_body *body;
471 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
472 lustre_swab_mds_body);
474 CERROR ("Can't swab mds_body\n");
478 /* If this is a successful OPEN request, we need to set
479 replay handler and data early, so that if replay happens
480 immediately after swabbing below, new reply is swabbed
481 by that handler correctly */
482 if (it_disposition(it, DISP_OPEN_OPEN) &&
483 !it_open_error(DISP_OPEN_OPEN, it))
484 mdc_set_open_replay_data(NULL, req);
486 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
487 /* The eadata is opaque; just check that it is there.
488 * Eventually, obd_unpackmd() will check the contents */
489 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
490 body->eadatasize, NULL);
491 if (eadata == NULL) {
492 CERROR ("Missing/short eadata\n");
495 if (body->valid & OBD_MD_FLMODEASIZE) {
496 if (obddev->u.cli.cl_max_mds_easize <
498 obddev->u.cli.cl_max_mds_easize =
500 CDEBUG(D_INFO, "maxeasize become %d\n",
503 if (obddev->u.cli.cl_max_mds_cookiesize <
504 body->max_cookiesize) {
505 obddev->u.cli.cl_max_mds_cookiesize =
506 body->max_cookiesize;
507 CDEBUG(D_INFO, "cookiesize become %d\n",
508 body->max_cookiesize);
511 /* We save the reply LOV EA in case we have to replay
512 * a create for recovery. If we didn't allocate a
513 * large enough request buffer above we need to
514 * reallocate it here to hold the actual LOV EA. */
515 if (it->it_op & IT_OPEN) {
516 int offset = DLM_INTENT_REC_OFF + 2;
518 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
520 mdc_realloc_openmsg(req, body, size);
522 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
525 memcpy(lmm, eadata, body->eadatasize);
532 EXPORT_SYMBOL(mdc_enqueue);
535 * This long block is all about fixing up the lock and request state
536 * so that it is correct as of the moment _before_ the operation was
537 * applied; that way, the VFS will think that everything is normal and
538 * call Lustre's regular VFS methods.
540 * If we're performing a creation, that means that unless the creation
541 * failed with EEXIST, we should fake up a negative dentry.
543 * For everything else, we want to lookup to succeed.
545 * One additional note: if CREATE or OPEN succeeded, we add an extra
546 * reference to the request because we need to keep it around until
547 * ll_create/ll_open gets called.
549 * The server will return to us, in it_disposition, an indication of
550 * exactly what d.lustre.it_status refers to.
552 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
553 * otherwise if DISP_OPEN_CREATE is set, then it status is the
554 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
555 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
558 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
561 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
562 void *lmm, int lmmsize, struct lookup_intent *it,
563 int lookup_flags, struct ptlrpc_request **reqp,
564 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
566 struct lustre_handle lockh;
567 struct ptlrpc_request *request;
569 struct mds_body *mds_body;
570 struct lustre_handle old_lock;
571 struct ldlm_lock *lock;
575 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
576 op_data->namelen, op_data->name, op_data->fid1.id,
577 ldlm_it2str(it->it_op), it->it_flags);
579 if (op_data->fid2.id &&
580 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
581 /* We could just return 1 immediately, but since we should only
582 * be called in revalidate_it if we already have a lock, let's
584 struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
585 op_data->fid2.generation}};
586 struct lustre_handle lockh;
587 ldlm_policy_data_t policy;
590 /* As not all attributes are kept under update lock, e.g.
591 owner/group/acls are under lookup lock, we need both
592 ibits for GETATTR. */
593 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
594 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
595 MDS_INODELOCK_LOOKUP;
597 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
598 LDLM_FL_BLOCK_GRANTED, &res_id,
599 LDLM_IBITS, &policy, LCK_CR, &lockh);
602 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
603 LDLM_FL_BLOCK_GRANTED, &res_id,
604 LDLM_IBITS, &policy,LCK_CW,&lockh);
608 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
609 LDLM_FL_BLOCK_GRANTED, &res_id,
610 LDLM_IBITS, &policy,LCK_PR,&lockh);
613 memcpy(&it->d.lustre.it_lock_handle, &lockh,
615 it->d.lustre.it_lock_mode = mode;
618 /* Only return failure if it was not GETATTR by cfid
619 (from inode_revalidate) */
620 if (rc || op_data->namelen != 0)
624 /* lookup_it may be called only after revalidate_it has run, because
625 * revalidate_it cannot return errors, only zero. Returning zero causes
626 * this call to lookup, which *can* return an error.
628 * We only want to execute the request associated with the intent one
629 * time, however, so don't send the request again. Instead, skip past
630 * this and use the request from revalidate. In this case, revalidate
631 * never dropped its reference, so the refcounts are all OK */
632 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
633 struct ldlm_enqueue_info einfo =
634 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
635 ldlm_completion_ast, NULL, NULL };
637 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
638 lmm, lmmsize, extra_lock_flags);
641 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
642 } else if (!op_data->fid2.id) {
643 /* DISP_ENQ_COMPLETE set means there is extra reference on
644 * request referenced from this intent, saved for subsequent
645 * lookup. This path is executed when we proceed to this
646 * lookup, so we clear DISP_ENQ_COMPLETE */
647 it_clear_disposition(it, DISP_ENQ_COMPLETE);
649 request = *reqp = it->d.lustre.it_data;
650 LASSERT(request != NULL);
651 LASSERT(request != LP_POISON);
652 LASSERT(request->rq_repmsg != LP_POISON);
654 if (!it_disposition(it, DISP_IT_EXECD)) {
655 /* The server failed before it even started executing the
656 * intent, i.e. because it couldn't unpack the request. */
657 LASSERT(it->d.lustre.it_status != 0);
658 RETURN(it->d.lustre.it_status);
660 rc = it_open_error(DISP_IT_EXECD, it);
664 mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
666 LASSERT(mds_body != NULL); /* mdc_enqueue checked */
667 LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
669 /* If we were revalidating a fid/name pair, mark the intent in
670 * case we fail and get called again from lookup */
671 if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
672 it_set_disposition(it, DISP_ENQ_COMPLETE);
673 /* Also: did we find the same inode? */
674 if (memcmp(&op_data->fid2, &mds_body->fid1,
675 sizeof(op_data->fid2)))
679 rc = it_open_error(DISP_LOOKUP_EXECD, it);
683 /* keep requests around for the multiple phases of the call
684 * this shows the DISP_XX must guarantee we make it into the call
686 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
687 it_disposition(it, DISP_OPEN_CREATE) &&
688 !it_open_error(DISP_OPEN_CREATE, it)) {
689 it_set_disposition(it, DISP_ENQ_CREATE_REF);
690 ptlrpc_request_addref(request); /* balanced in ll_create_node */
692 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
693 it_disposition(it, DISP_OPEN_OPEN) &&
694 !it_open_error(DISP_OPEN_OPEN, it)) {
695 it_set_disposition(it, DISP_ENQ_OPEN_REF);
696 ptlrpc_request_addref(request); /* balanced in ll_file_open */
697 /* BUG 11546 - eviction in the middle of open rpc processing */
698 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
701 if (it->it_op & IT_CREAT) {
702 /* XXX this belongs in ll_create_it */
703 } else if (it->it_op == IT_OPEN) {
704 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
706 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
709 /* If we already have a matching lock, then cancel the new
710 * one. We have to set the data here instead of in
711 * mdc_enqueue, because we need to use the child's inode as
712 * the l_ast_data to match, and that's not available until
713 * intent_finish has performed the iget().) */
714 lock = ldlm_handle2lock(&lockh);
716 ldlm_policy_data_t policy = lock->l_policy_data;
717 LDLM_DEBUG(lock, "matching against this");
719 memcpy(&old_lock, &lockh, sizeof(lockh));
720 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
721 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
722 ldlm_lock_decref_and_cancel(&lockh,
723 it->d.lustre.it_lock_mode);
724 memcpy(&lockh, &old_lock, sizeof(old_lock));
725 memcpy(&it->d.lustre.it_lock_handle, &lockh,
729 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
730 op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
731 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
735 EXPORT_SYMBOL(mdc_intent_lock);