1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDC
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
36 # include <liblustre.h>
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
44 int it_disposition(struct lookup_intent *it, int flag)
46 return it->d.lustre.it_disposition & flag;
48 EXPORT_SYMBOL(it_disposition);
50 void it_set_disposition(struct lookup_intent *it, int flag)
52 it->d.lustre.it_disposition |= flag;
54 EXPORT_SYMBOL(it_set_disposition);
56 void it_clear_disposition(struct lookup_intent *it, int flag)
58 it->d.lustre.it_disposition &= ~flag;
60 EXPORT_SYMBOL(it_clear_disposition);
62 static int it_to_lock_mode(struct lookup_intent *it)
64 /* CREAT needs to be tested before open (both could be set) */
65 if (it->it_op & IT_CREAT)
67 else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
74 int it_open_error(int phase, struct lookup_intent *it)
76 if (it_disposition(it, DISP_OPEN_OPEN)) {
77 if (phase >= DISP_OPEN_OPEN)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_OPEN_CREATE)) {
84 if (phase >= DISP_OPEN_CREATE)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91 if (phase >= DISP_LOOKUP_EXECD)
92 return it->d.lustre.it_status;
97 if (it_disposition(it, DISP_IT_EXECD)) {
98 if (phase >= DISP_IT_EXECD)
99 return it->d.lustre.it_status;
103 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104 it->d.lustre.it_status);
108 EXPORT_SYMBOL(it_open_error);
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
113 struct ldlm_lock *lock;
114 struct lustre_handle *lockh = (struct lustre_handle *)l;
122 lock = ldlm_handle2lock(lockh);
124 LASSERT(lock != NULL);
125 lock_res_and_lock(lock);
127 if (lock->l_ast_data && lock->l_ast_data != data) {
128 struct inode *new_inode = data;
129 struct inode *old_inode = lock->l_ast_data;
130 LASSERTF(old_inode->i_state & I_FREEING,
131 "Found existing inode %p/%lu/%u state %lu in lock: "
132 "setting data to %p/%lu/%u\n", old_inode,
133 old_inode->i_ino, old_inode->i_generation,
135 new_inode, new_inode->i_ino, new_inode->i_generation);
138 lock->l_ast_data = data;
139 unlock_res_and_lock(lock);
144 EXPORT_SYMBOL(mdc_set_lock_data);
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
147 ldlm_iterator_t it, void *data)
149 struct ldlm_res_id res_id = { .name = {0} };
152 res_id.name[0] = fid->id;
153 res_id.name[1] = fid->generation;
155 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
162 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
164 /* Don't hold error requests for replay. */
165 if (req->rq_replay) {
166 spin_lock(&req->rq_lock);
168 spin_unlock(&req->rq_lock);
170 if (rc && req->rq_transno != 0) {
171 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
176 static int round_up(int val)
186 /* Save a large LOV EA into the request buffer so that it is available
187 * for replay. We don't do this in the initial request because the
188 * original request doesn't need this buffer (at most it sends just the
189 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
190 * buffer and may also be difficult to allocate and save a very large
191 * request buffer for each open. (bug 5707)
193 * OOM here may cause recovery failure if lmm is needed (only for the
194 * original open if the MDS crashed just when this client also OOM'd)
195 * but this is incredibly unlikely, and questionable whether the client
196 * could do MDS recovery under OOM anyways... */
197 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
198 struct mds_body *body, int size[6])
200 int new_size, old_size;
201 struct lustre_msg *new_msg;
204 old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
206 size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
207 new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
208 OBD_ALLOC(new_msg, new_size);
209 if (new_msg != NULL) {
210 struct lustre_msg *old_msg = req->rq_reqmsg;
212 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
214 memcpy(new_msg, old_msg, old_size);
215 lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
218 spin_lock(&req->rq_lock);
219 req->rq_reqmsg = new_msg;
220 req->rq_reqlen = new_size;
221 spin_unlock(&req->rq_lock);
223 OBD_FREE(old_msg, old_size);
225 body->valid &= ~OBD_MD_FLEASIZE;
226 body->eadatasize = 0;
230 /* We always reserve enough space in the reply packet for a stripe MD, because
231 * we don't know in advance the file type. */
232 int mdc_enqueue(struct obd_export *exp,
234 struct lookup_intent *it,
236 struct mdc_op_data *data,
237 struct lustre_handle *lockh,
240 ldlm_completion_callback cb_completion,
241 ldlm_blocking_callback cb_blocking,
242 void *cb_data, int extra_lock_flags)
244 struct ptlrpc_request *req;
245 struct obd_device *obddev = class_exp2obd(exp);
246 struct ldlm_res_id res_id =
247 { .name = {data->fid1.id, data->fid1.generation} };
248 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
249 struct ldlm_request *lockreq;
250 struct ldlm_intent *lit;
251 struct ldlm_reply *lockrep;
252 int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
253 [DLM_LOCKREQ_OFF] = sizeof(*lockreq),
254 [DLM_INTENT_IT_OFF] = sizeof(*lit) };
255 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
256 [DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
257 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
258 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
260 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
261 int repbufcnt = 4, rc;
265 LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
266 // LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
267 // ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
269 if (it->it_op & IT_OPEN) {
270 it->it_create_mode |= S_IFREG;
272 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
273 size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
274 /* As an optimization, we allocate an RPC request buffer for
275 * at least a default-sized LOV EA even if we aren't sending
276 * one. We grow the whole request to the next power-of-two
277 * size since we get that much from a slab allocation anyways.
278 * This avoids an allocation below in the common case where
279 * we need to save a default-sized LOV EA for open replay. */
280 size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
281 obddev->u.cli.cl_default_mds_easize);
282 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
285 size[DLM_INTENT_REC_OFF + 2] =
286 min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
287 obddev->u.cli.cl_max_mds_easize);
289 if (it->it_flags & O_JOIN_FILE) {
290 __u64 head_size = *(__u32*)cb_data;
291 __u32 tsize = *(__u32*)lmm;
293 /* join is like an unlink of the tail */
294 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
295 size[DLM_INTENT_REC_OFF + 3] =
296 sizeof(struct mds_rec_join);
297 req = ptlrpc_prep_req(class_exp2cliimp(exp),
298 LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
300 /* when joining file, cb_data and lmm args together
301 * indicate the head file size*/
302 mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data,
303 (head_size << 32) | tsize);
307 req = ptlrpc_prep_req(class_exp2cliimp(exp),
308 LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
315 spin_lock(&req->rq_lock);
317 spin_unlock(&req->rq_lock);
319 /* pack the intent */
320 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
322 lit->opc = (__u64)it->it_op;
324 /* pack the intended request */
325 mdc_open_pack(req, DLM_INTENT_REC_OFF, data, it->it_create_mode,
326 0, it->it_flags, lmm, lmmsize);
328 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
329 } else if (it->it_op & IT_UNLINK) {
330 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
331 size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
332 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
333 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
334 LDLM_ENQUEUE, 5, size, NULL);
338 /* pack the intent */
339 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
341 lit->opc = (__u64)it->it_op;
343 /* pack the intended request */
344 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
346 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
347 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
348 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
349 OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
351 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
352 size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
354 if (it->it_op & IT_GETATTR)
355 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
357 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
358 LDLM_ENQUEUE, 5, size, NULL);
362 /* pack the intent */
363 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
365 lit->opc = (__u64)it->it_op;
367 /* pack the intended request */
368 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
371 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
372 } else if (it->it_op == IT_READDIR) {
373 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
374 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
375 LDLM_ENQUEUE, 2, size, NULL);
385 /* get ready for the reply */
386 ptlrpc_req_set_repsize(req, repbufcnt, repsize);
388 /* It is important to obtain rpc_lock first (if applicable), so that
389 * threads that are serialised with rpc_lock are not polluting our
390 * rpcs in flight counter */
391 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
392 mdc_enter_request(&obddev->u.cli);
393 rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy,
394 lock_mode, &flags, cb_blocking, cb_completion,
395 NULL, cb_data, NULL, 0, NULL, lockh, 0);
396 mdc_exit_request(&obddev->u.cli);
397 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
399 /* Similarly, if we're going to replay this request, we don't want to
400 * actually get a lock, just perform the intent. */
401 if (req->rq_transno || req->rq_replay) {
402 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
404 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
407 /* This can go when we're sure that this can never happen */
408 LASSERT(rc != -ENOENT);
409 if (rc == ELDLM_LOCK_ABORTED) {
411 memset(lockh, 0, sizeof(*lockh));
413 } else if (rc != 0) {
414 CERROR("ldlm_cli_enqueue: %d\n", rc);
415 LASSERTF(rc < 0, "rc %d\n", rc);
416 mdc_clear_replay_flag(req, rc);
417 ptlrpc_req_finished(req);
419 } else { /* rc = 0 */
420 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
423 /* If the server gave us back a different lock mode, we should
424 * fix up our variables. */
425 if (lock->l_req_mode != lock_mode) {
426 ldlm_lock_addref(lockh, lock->l_req_mode);
427 ldlm_lock_decref(lockh, lock_mode);
428 lock_mode = lock->l_req_mode;
433 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
435 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
436 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */
438 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
439 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
440 it->d.lustre.it_lock_mode = lock_mode;
441 it->d.lustre.it_data = req;
443 if (it->d.lustre.it_status < 0 && req->rq_replay)
444 mdc_clear_replay_flag(req, it->d.lustre.it_status);
446 /* If we're doing an IT_OPEN which did not result in an actual
447 * successful open, then we need to remove the bit which saves
448 * this request for unconditional replay.
450 * It's important that we do this first! Otherwise we might exit the
451 * function without doing so, and try to replay a failed create
453 if (it->it_op & IT_OPEN && req->rq_replay &&
454 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
455 mdc_clear_replay_flag(req, it->d.lustre.it_status);
457 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
458 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
460 /* We know what to expect, so we do any byte flipping required here */
461 LASSERT(repbufcnt == 5 || repbufcnt == 2);
462 if (repbufcnt == 5) {
463 struct mds_body *body;
465 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
466 lustre_swab_mds_body);
468 CERROR ("Can't swab mds_body\n");
472 /* If this is a successful OPEN request, we need to set
473 replay handler and data early, so that if replay happens
474 immediately after swabbing below, new reply is swabbed
475 by that handler correctly */
476 if (it_disposition(it, DISP_OPEN_OPEN) &&
477 !it_open_error(DISP_OPEN_OPEN, it))
478 mdc_set_open_replay_data(NULL, req);
480 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
481 /* The eadata is opaque; just check that it is there.
482 * Eventually, obd_unpackmd() will check the contents */
483 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
484 body->eadatasize, NULL);
485 if (eadata == NULL) {
486 CERROR ("Missing/short eadata\n");
489 if (body->valid & OBD_MD_FLMODEASIZE) {
490 if (obddev->u.cli.cl_max_mds_easize <
492 obddev->u.cli.cl_max_mds_easize =
494 CDEBUG(D_INFO, "maxeasize become %d\n",
497 if (obddev->u.cli.cl_max_mds_cookiesize <
498 body->max_cookiesize) {
499 obddev->u.cli.cl_max_mds_cookiesize =
500 body->max_cookiesize;
501 CDEBUG(D_INFO, "cookiesize become %d\n",
502 body->max_cookiesize);
505 /* We save the reply LOV EA in case we have to replay
506 * a create for recovery. If we didn't allocate a
507 * large enough request buffer above we need to
508 * reallocate it here to hold the actual LOV EA. */
509 if (it->it_op & IT_OPEN) {
510 int offset = DLM_INTENT_REC_OFF + 2;
512 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
514 mdc_realloc_openmsg(req, body, size);
516 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
519 memcpy(lmm, eadata, body->eadatasize);
526 EXPORT_SYMBOL(mdc_enqueue);
529 * This long block is all about fixing up the lock and request state
530 * so that it is correct as of the moment _before_ the operation was
531 * applied; that way, the VFS will think that everything is normal and
532 * call Lustre's regular VFS methods.
534 * If we're performing a creation, that means that unless the creation
535 * failed with EEXIST, we should fake up a negative dentry.
537 * For everything else, we want to lookup to succeed.
539 * One additional note: if CREATE or OPEN succeeded, we add an extra
540 * reference to the request because we need to keep it around until
541 * ll_create/ll_open gets called.
543 * The server will return to us, in it_disposition, an indication of
544 * exactly what d.lustre.it_status refers to.
546 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
547 * otherwise if DISP_OPEN_CREATE is set, then it status is the
548 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
549 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
552 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
555 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
556 void *lmm, int lmmsize, struct lookup_intent *it,
557 int lookup_flags, struct ptlrpc_request **reqp,
558 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
560 struct lustre_handle lockh;
561 struct ptlrpc_request *request;
563 struct mds_body *mds_body;
564 struct lustre_handle old_lock;
565 struct ldlm_lock *lock;
569 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
570 op_data->namelen, op_data->name, op_data->fid1.id,
571 ldlm_it2str(it->it_op), it->it_flags);
573 if (op_data->fid2.id &&
574 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
575 /* We could just return 1 immediately, but since we should only
576 * be called in revalidate_it if we already have a lock, let's
578 struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
579 op_data->fid2.generation}};
580 struct lustre_handle lockh;
581 ldlm_policy_data_t policy;
584 /* As not all attributes are kept under update lock, e.g.
585 owner/group/acls are under lookup lock, we need both
586 ibits for GETATTR. */
587 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
588 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
589 MDS_INODELOCK_LOOKUP;
591 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
592 LDLM_FL_BLOCK_GRANTED, &res_id,
593 LDLM_IBITS, &policy, LCK_CR, &lockh);
596 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
597 LDLM_FL_BLOCK_GRANTED, &res_id,
598 LDLM_IBITS, &policy,LCK_CW,&lockh);
602 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
603 LDLM_FL_BLOCK_GRANTED, &res_id,
604 LDLM_IBITS, &policy,LCK_PR,&lockh);
607 memcpy(&it->d.lustre.it_lock_handle, &lockh,
609 it->d.lustre.it_lock_mode = mode;
612 /* Only return failure if it was not GETATTR by cfid
613 (from inode_revalidate) */
614 if (rc || op_data->namelen != 0)
618 /* lookup_it may be called only after revalidate_it has run, because
619 * revalidate_it cannot return errors, only zero. Returning zero causes
620 * this call to lookup, which *can* return an error.
622 * We only want to execute the request associated with the intent one
623 * time, however, so don't send the request again. Instead, skip past
624 * this and use the request from revalidate. In this case, revalidate
625 * never dropped its reference, so the refcounts are all OK */
626 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
628 rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
629 op_data, &lockh, lmm, lmmsize,
630 ldlm_completion_ast, cb_blocking, NULL,
634 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
635 } else if (!op_data->fid2.id) {
636 /* DISP_ENQ_COMPLETE set means there is extra reference on
637 * request referenced from this intent, saved for subsequent
638 * lookup. This path is executed when we proceed to this
639 * lookup, so we clear DISP_ENQ_COMPLETE */
640 it_clear_disposition(it, DISP_ENQ_COMPLETE);
642 request = *reqp = it->d.lustre.it_data;
643 LASSERT(request != NULL);
644 LASSERT(request != LP_POISON);
645 LASSERT(request->rq_repmsg != LP_POISON);
647 if (!it_disposition(it, DISP_IT_EXECD)) {
648 /* The server failed before it even started executing the
649 * intent, i.e. because it couldn't unpack the request. */
650 LASSERT(it->d.lustre.it_status != 0);
651 RETURN(it->d.lustre.it_status);
653 rc = it_open_error(DISP_IT_EXECD, it);
657 mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
659 LASSERT(mds_body != NULL); /* mdc_enqueue checked */
660 LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
662 /* If we were revalidating a fid/name pair, mark the intent in
663 * case we fail and get called again from lookup */
664 if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
665 it_set_disposition(it, DISP_ENQ_COMPLETE);
666 /* Also: did we find the same inode? */
667 if (memcmp(&op_data->fid2, &mds_body->fid1,
668 sizeof(op_data->fid2)))
672 rc = it_open_error(DISP_LOOKUP_EXECD, it);
676 /* keep requests around for the multiple phases of the call
677 * this shows the DISP_XX must guarantee we make it into the call
679 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
680 it_disposition(it, DISP_OPEN_CREATE) &&
681 !it_open_error(DISP_OPEN_CREATE, it)) {
682 it_set_disposition(it, DISP_ENQ_CREATE_REF);
683 ptlrpc_request_addref(request); /* balanced in ll_create_node */
685 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
686 it_disposition(it, DISP_OPEN_OPEN) &&
687 !it_open_error(DISP_OPEN_OPEN, it)) {
688 it_set_disposition(it, DISP_ENQ_OPEN_REF);
689 ptlrpc_request_addref(request); /* balanced in ll_file_open */
690 /* BUG 11546 - eviction in the middle of open rpc processing */
691 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
694 if (it->it_op & IT_CREAT) {
695 /* XXX this belongs in ll_create_it */
696 } else if (it->it_op == IT_OPEN) {
697 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
699 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
702 /* If we already have a matching lock, then cancel the new
703 * one. We have to set the data here instead of in
704 * mdc_enqueue, because we need to use the child's inode as
705 * the l_ast_data to match, and that's not available until
706 * intent_finish has performed the iget().) */
707 lock = ldlm_handle2lock(&lockh);
709 ldlm_policy_data_t policy = lock->l_policy_data;
710 LDLM_DEBUG(lock, "matching against this");
712 memcpy(&old_lock, &lockh, sizeof(lockh));
713 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
714 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
715 ldlm_lock_decref_and_cancel(&lockh,
716 it->d.lustre.it_lock_mode);
717 memcpy(&lockh, &old_lock, sizeof(old_lock));
718 memcpy(&it->d.lustre.it_lock_handle, &lockh,
722 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
723 op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
724 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
728 EXPORT_SYMBOL(mdc_intent_lock);