1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDC
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
36 # include <liblustre.h>
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
44 int it_disposition(struct lookup_intent *it, int flag)
46 return it->d.lustre.it_disposition & flag;
48 EXPORT_SYMBOL(it_disposition);
50 void it_set_disposition(struct lookup_intent *it, int flag)
52 it->d.lustre.it_disposition |= flag;
54 EXPORT_SYMBOL(it_set_disposition);
56 void it_clear_disposition(struct lookup_intent *it, int flag)
58 it->d.lustre.it_disposition &= ~flag;
60 EXPORT_SYMBOL(it_clear_disposition);
62 int it_open_error(int phase, struct lookup_intent *it)
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data)
101 struct ldlm_lock *lock;
102 struct lustre_handle *lockh = (struct lustre_handle *)l;
110 lock = ldlm_handle2lock(lockh);
112 LASSERT(lock != NULL);
113 lock_res_and_lock(lock);
115 if (lock->l_ast_data && lock->l_ast_data != data) {
116 struct inode *new_inode = data;
117 struct inode *old_inode = lock->l_ast_data;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_ast_data = data;
127 unlock_res_and_lock(lock);
132 EXPORT_SYMBOL(mdc_set_lock_data);
134 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
135 ldlm_iterator_t it, void *data)
137 struct ldlm_res_id res_id = { .name = {0} };
140 res_id.name[0] = fid->id;
141 res_id.name[1] = fid->generation;
143 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
150 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
152 /* Don't hold error requests for replay. */
153 if (req->rq_replay) {
154 spin_lock(&req->rq_lock);
156 spin_unlock(&req->rq_lock);
158 if (rc && req->rq_transno != 0) {
159 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
164 static int round_up(int val)
174 /* Save a large LOV EA into the request buffer so that it is available
175 * for replay. We don't do this in the initial request because the
176 * original request doesn't need this buffer (at most it sends just the
177 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
178 * buffer and may also be difficult to allocate and save a very large
179 * request buffer for each open. (bug 5707)
181 * OOM here may cause recovery failure if lmm is needed (only for the
182 * original open if the MDS crashed just when this client also OOM'd)
183 * but this is incredibly unlikely, and questionable whether the client
184 * could do MDS recovery under OOM anyways... */
185 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
186 struct mds_body *body)
188 int old_len, new_size, old_size;
189 struct lustre_msg *old_msg = req->rq_reqmsg;
190 struct lustre_msg *new_msg;
192 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
193 old_size = lustre_packed_msg_size(old_msg);
194 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
196 new_size = lustre_packed_msg_size(old_msg);
198 OBD_ALLOC(new_msg, new_size);
199 if (new_msg != NULL) {
200 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
202 memcpy(new_msg, old_msg, old_size);
204 spin_lock(&req->rq_lock);
205 req->rq_reqmsg = new_msg;
206 req->rq_reqlen = new_size;
207 spin_unlock(&req->rq_lock);
209 OBD_FREE(old_msg, old_size);
211 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
212 body->valid &= ~OBD_MD_FLEASIZE;
213 body->eadatasize = 0;
217 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
218 struct lookup_intent *it,
219 struct mdc_op_data *data,
220 void *lmm, int lmmsize)
222 struct ptlrpc_request *req;
223 struct ldlm_intent *lit;
224 struct obd_device *obddev = class_exp2obd(exp);
225 int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
226 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
227 [DLM_INTENT_IT_OFF] = sizeof(*lit),
228 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
229 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
230 /* As an optimization, we allocate an RPC request buffer
231 * for at least a default-sized LOV EA even if we aren't
232 * sending one. We grow the whole request to the next
233 * power-of-two size since we get that much from a slab
234 * allocation anyways. This avoids an allocation below
235 * in the common case where we need to save a
236 * default-sized LOV EA for open replay. */
237 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
238 obddev->u.cli.cl_default_mds_easize) };
239 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
240 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
241 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
242 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
244 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
245 CFS_LIST_HEAD(cancels);
246 int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
251 it->it_create_mode |= S_IFREG;
253 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
255 size[DLM_INTENT_REC_OFF + 2] =
256 min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
257 obddev->u.cli.cl_max_mds_easize);
259 /* If inode is known, cancel conflicting OPEN locks. */
261 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
264 else if (it->it_flags & FMODE_EXEC)
269 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
270 mode, MDS_INODELOCK_OPEN);
273 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
274 if (it->it_op & IT_CREAT || do_join)
278 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
279 MDS_INODELOCK_UPDATE);
281 __u64 head_size = (*(__u64 *)data->data);
282 /* join is like an unlink of the tail */
283 size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
284 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
286 mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data,
289 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
290 it->it_flags &= ~O_JOIN_FILE;
294 spin_lock(&req->rq_lock);
296 spin_unlock(&req->rq_lock);
298 /* pack the intent */
299 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
301 lit->opc = (__u64)it->it_op;
303 /* pack the intended request */
304 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
305 it->it_create_mode, 0, it->it_flags,
308 ptlrpc_req_set_repsize(req, 5, repsize);
313 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
314 struct lookup_intent *it,
315 struct mdc_op_data *data)
317 struct ptlrpc_request *req;
318 struct ldlm_intent *lit;
319 struct obd_device *obddev = class_exp2obd(exp);
320 int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
321 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
322 [DLM_INTENT_IT_OFF] = sizeof(*lit),
323 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink),
324 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
325 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
326 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
327 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
328 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
330 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
331 cl_max_mds_cookiesize };
333 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
335 /* pack the intent */
336 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
338 lit->opc = (__u64)it->it_op;
340 /* pack the intended request */
341 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
343 ptlrpc_req_set_repsize(req, 5, repsize);
348 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
349 struct lookup_intent *it,
350 struct mdc_op_data *data)
352 struct ptlrpc_request *req;
353 struct ldlm_intent *lit;
354 struct obd_device *obddev = class_exp2obd(exp);
355 int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
357 [DLM_INTENT_IT_OFF] = sizeof(*lit),
358 [DLM_INTENT_REC_OFF] = sizeof(struct mds_body),
359 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
360 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
361 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
362 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
363 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
365 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
366 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
367 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
369 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
371 /* pack the intent */
372 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
374 lit->opc = (__u64)it->it_op;
376 /* pack the intended request */
377 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
379 ptlrpc_req_set_repsize(req, 5, repsize);
384 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
386 struct ptlrpc_request *req;
387 int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
388 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
389 int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
390 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply) };
392 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
394 ptlrpc_req_set_repsize(req, 2, repsize);
398 static int mdc_finish_enqueue(struct obd_export *exp,
399 struct ptlrpc_request *req,
400 struct ldlm_enqueue_info *einfo,
401 struct lookup_intent *it,
402 struct lustre_handle *lockh,
405 struct ldlm_request *lockreq;
406 struct ldlm_reply *lockrep;
410 /* Similarly, if we're going to replay this request, we don't want to
411 * actually get a lock, just perform the intent. */
412 if (req->rq_transno || req->rq_replay) {
413 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
415 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
418 if (rc == ELDLM_LOCK_ABORTED) {
420 memset(lockh, 0, sizeof(*lockh));
422 } else { /* rc = 0 */
423 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
426 /* If the server gave us back a different lock mode, we should
427 * fix up our variables. */
428 if (lock->l_req_mode != einfo->ei_mode) {
429 ldlm_lock_addref(lockh, lock->l_req_mode);
430 ldlm_lock_decref(lockh, einfo->ei_mode);
431 einfo->ei_mode = lock->l_req_mode;
436 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
438 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
439 /* swabbed by ldlm_cli_enqueue() */
440 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
442 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
443 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
444 it->d.lustre.it_lock_mode = einfo->ei_mode;
445 it->d.lustre.it_data = req;
447 if (it->d.lustre.it_status < 0 && req->rq_replay)
448 mdc_clear_replay_flag(req, it->d.lustre.it_status);
450 /* If we're doing an IT_OPEN which did not result in an actual
451 * successful open, then we need to remove the bit which saves
452 * this request for unconditional replay.
454 * It's important that we do this first! Otherwise we might exit the
455 * function without doing so, and try to replay a failed create
457 if ((it->it_op & IT_OPEN) &&
459 (!it_disposition(it, DISP_OPEN_OPEN) ||
460 it->d.lustre.it_status != 0))
461 mdc_clear_replay_flag(req, it->d.lustre.it_status);
463 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
464 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
466 /* We know what to expect, so we do any byte flipping required here */
467 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
468 struct mds_body *body;
470 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
471 lustre_swab_mds_body);
473 CERROR ("Can't swab mds_body\n");
477 /* If this is a successful OPEN request, we need to set
478 replay handler and data early, so that if replay happens
479 immediately after swabbing below, new reply is swabbed
480 by that handler correctly */
481 if (it_disposition(it, DISP_OPEN_OPEN) &&
482 !it_open_error(DISP_OPEN_OPEN, it))
483 mdc_set_open_replay_data(NULL, req);
485 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
488 /* The eadata is opaque; just check that it is there.
489 * Eventually, obd_unpackmd() will check the contents */
490 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
491 body->eadatasize, NULL);
492 if (eadata == NULL) {
493 CERROR ("Missing/short eadata\n");
496 if (body->valid & OBD_MD_FLMODEASIZE) {
497 struct obd_device *obddev = class_exp2obd(exp);
499 if (obddev->u.cli.cl_max_mds_easize <
501 obddev->u.cli.cl_max_mds_easize =
503 CDEBUG(D_INFO, "maxeasize become %d\n",
506 if (obddev->u.cli.cl_max_mds_cookiesize <
507 body->max_cookiesize) {
508 obddev->u.cli.cl_max_mds_cookiesize =
509 body->max_cookiesize;
510 CDEBUG(D_INFO, "cookiesize become %d\n",
511 body->max_cookiesize);
514 /* We save the reply LOV EA in case we have to replay
515 * a create for recovery. If we didn't allocate a
516 * large enough request buffer above we need to
517 * reallocate it here to hold the actual LOV EA. */
518 if (it->it_op & IT_OPEN) {
519 int offset = DLM_INTENT_REC_OFF + 2;
522 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
524 mdc_realloc_openmsg(req, body);
526 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
529 memcpy(lmm, eadata, body->eadatasize);
537 /* We always reserve enough space in the reply packet for a stripe MD, because
538 * we don't know in advance the file type. */
539 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
540 struct lookup_intent *it, struct mdc_op_data *data,
541 struct lustre_handle *lockh, void *lmm, int lmmsize,
542 int extra_lock_flags)
544 struct ptlrpc_request *req;
545 struct obd_device *obddev = class_exp2obd(exp);
546 struct ldlm_res_id res_id =
547 { .name = {data->fid1.id, data->fid1.generation} };
548 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
549 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
553 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
554 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
555 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
557 if (it->it_op & IT_OPEN) {
558 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
559 if (it->it_flags & O_JOIN_FILE) {
560 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
562 } else if (it->it_op & IT_UNLINK) {
563 req = mdc_intent_unlink_pack(exp, it, data);
564 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
565 req = mdc_intent_lookup_pack(exp, it, data);
566 } else if (it->it_op == IT_READDIR) {
567 req = mdc_intent_readdir_pack(exp);
569 CERROR("bad it_op %x\n", it->it_op);
576 /* It is important to obtain rpc_lock first (if applicable), so that
577 * threads that are serialised with rpc_lock are not polluting our
578 * rpcs in flight counter */
579 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
580 mdc_enter_request(&obddev->u.cli);
581 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
583 mdc_exit_request(&obddev->u.cli);
584 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
586 CERROR("ldlm_cli_enqueue: %d\n", rc);
587 mdc_clear_replay_flag(req, rc);
588 ptlrpc_req_finished(req);
591 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
595 EXPORT_SYMBOL(mdc_enqueue);
597 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
600 /* We could just return 1 immediately, but since we should only
601 * be called in revalidate_it if we already have a lock, let's
603 struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
604 struct lustre_handle lockh;
605 ldlm_policy_data_t policy;
608 /* As not all attributes are kept under update lock, e.g.
609 owner/group/acls are under lookup lock, we need both
610 ibits for GETATTR. */
611 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
612 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
613 MDS_INODELOCK_LOOKUP;
615 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
616 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
617 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
619 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
620 it->d.lustre.it_lock_mode = mode;
625 EXPORT_SYMBOL(mdc_revalidate_lock);
627 static int mdc_finish_intent_lock(struct obd_export *exp,
628 struct ptlrpc_request *req,
629 struct mdc_op_data *data,
630 struct lookup_intent *it,
631 struct lustre_handle *lockh)
633 struct mds_body *mds_body;
634 struct lustre_handle old_lock;
635 struct ldlm_lock *lock;
639 LASSERT(req != NULL);
640 LASSERT(req != LP_POISON);
641 LASSERT(req->rq_repmsg != LP_POISON);
643 if (!it_disposition(it, DISP_IT_EXECD)) {
644 /* The server failed before it even started executing the
645 * intent, i.e. because it couldn't unpack the request. */
646 LASSERT(it->d.lustre.it_status != 0);
647 RETURN(it->d.lustre.it_status);
649 rc = it_open_error(DISP_IT_EXECD, it);
653 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
655 /* mdc_enqueue checked */
656 LASSERT(mds_body != NULL);
657 /* mdc_enqueue swabbed */
658 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
660 /* If we were revalidating a fid/name pair, mark the intent in
661 * case we fail and get called again from lookup */
662 if (data->fid2.id && (it->it_op != IT_GETATTR)) {
663 it_set_disposition(it, DISP_ENQ_COMPLETE);
664 /* Also: did we find the same inode? */
665 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)))
669 rc = it_open_error(DISP_LOOKUP_EXECD, it);
673 /* keep requests around for the multiple phases of the call
674 * this shows the DISP_XX must guarantee we make it into the call
676 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
677 it_disposition(it, DISP_OPEN_CREATE) &&
678 !it_open_error(DISP_OPEN_CREATE, it)) {
679 it_set_disposition(it, DISP_ENQ_CREATE_REF);
680 ptlrpc_request_addref(req); /* balanced in ll_create_node */
682 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
683 it_disposition(it, DISP_OPEN_OPEN) &&
684 !it_open_error(DISP_OPEN_OPEN, it)) {
685 it_set_disposition(it, DISP_ENQ_OPEN_REF);
686 ptlrpc_request_addref(req); /* balanced in ll_file_open */
687 /* BUG 11546 - eviction in the middle of open rpc processing */
688 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
691 if (it->it_op & IT_CREAT) {
692 /* XXX this belongs in ll_create_it */
693 } else if (it->it_op == IT_OPEN) {
694 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
696 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
699 /* If we already have a matching lock, then cancel the new
700 * one. We have to set the data here instead of in
701 * mdc_enqueue, because we need to use the child's inode as
702 * the l_ast_data to match, and that's not available until
703 * intent_finish has performed the iget().) */
704 lock = ldlm_handle2lock(lockh);
706 ldlm_policy_data_t policy = lock->l_policy_data;
708 LDLM_DEBUG(lock, "matching against this");
710 memcpy(&old_lock, lockh, sizeof(*lockh));
711 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
712 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
713 ldlm_lock_decref_and_cancel(lockh,
714 it->d.lustre.it_lock_mode);
715 memcpy(lockh, &old_lock, sizeof(old_lock));
716 memcpy(&it->d.lustre.it_lock_handle, lockh,
721 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
722 data->namelen, data->name, ldlm_it2str(it->it_op),
723 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
728 * This long block is all about fixing up the lock and request state
729 * so that it is correct as of the moment _before_ the operation was
730 * applied; that way, the VFS will think that everything is normal and
731 * call Lustre's regular VFS methods.
733 * If we're performing a creation, that means that unless the creation
734 * failed with EEXIST, we should fake up a negative dentry.
736 * For everything else, we want to lookup to succeed.
738 * One additional note: if CREATE or OPEN succeeded, we add an extra
739 * reference to the request because we need to keep it around until
740 * ll_create/ll_open gets called.
742 * The server will return to us, in it_disposition, an indication of
743 * exactly what d.lustre.it_status refers to.
745 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
746 * otherwise if DISP_OPEN_CREATE is set, then it status is the
747 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
748 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
751 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
754 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
755 void *lmm, int lmmsize, struct lookup_intent *it,
756 int lookup_flags, struct ptlrpc_request **reqp,
757 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
759 struct lustre_handle lockh;
765 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
766 op_data->namelen, op_data->name, op_data->fid1.id,
767 ldlm_it2str(it->it_op), it->it_flags);
770 if (op_data->fid2.id &&
771 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
772 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
773 /* Only return failure if it was not GETATTR by cfid
774 (from inode_revalidate) */
775 if (rc || op_data->namelen != 0)
779 /* lookup_it may be called only after revalidate_it has run, because
780 * revalidate_it cannot return errors, only zero. Returning zero causes
781 * this call to lookup, which *can* return an error.
783 * We only want to execute the request associated with the intent one
784 * time, however, so don't send the request again. Instead, skip past
785 * this and use the request from revalidate. In this case, revalidate
786 * never dropped its reference, so the refcounts are all OK */
787 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
788 struct ldlm_enqueue_info einfo =
789 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
790 ldlm_completion_ast, NULL, NULL };
792 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
793 lmm, lmmsize, extra_lock_flags);
796 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
797 } else if (!op_data->fid2.id) {
798 /* DISP_ENQ_COMPLETE set means there is extra reference on
799 * request referenced from this intent, saved for subsequent
800 * lookup. This path is executed when we proceed to this
801 * lookup, so we clear DISP_ENQ_COMPLETE */
802 it_clear_disposition(it, DISP_ENQ_COMPLETE);
805 *reqp = it->d.lustre.it_data;
806 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
810 EXPORT_SYMBOL(mdc_intent_lock);
812 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
813 void *unused, int rc)
815 struct mdc_enqueue_args *ma;
816 struct md_enqueue_info *minfo;
817 struct ldlm_enqueue_info *einfo;
818 struct obd_export *exp;
819 struct lookup_intent *it;
820 struct lustre_handle *lockh;
821 struct obd_device *obddev;
822 int flags = LDLM_FL_HAS_INTENT;
825 ma = (struct mdc_enqueue_args *)&req->rq_async_args;
831 lockh = &minfo->mi_lockh;
833 obddev = class_exp2obd(exp);
835 mdc_exit_request(&obddev->u.cli);
836 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
839 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
840 &flags, NULL, 0, NULL, lockh, rc);
842 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
843 mdc_clear_replay_flag(req, rc);
847 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
851 memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
853 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
857 minfo->mi_cb(exp, req, minfo, rc);
862 int mdc_intent_getattr_async(struct obd_export *exp,
863 struct md_enqueue_info *minfo,
864 struct ldlm_enqueue_info *einfo)
866 struct mdc_op_data *op_data = &minfo->mi_data;
867 struct lookup_intent *it = &minfo->mi_it;
868 struct ptlrpc_request *req;
869 struct obd_device *obddev = class_exp2obd(exp);
870 struct ldlm_res_id res_id = {
871 .name = {op_data->fid1.id,
872 op_data->fid1.generation}
874 ldlm_policy_data_t policy = {
875 .l_inodebits = { MDS_INODELOCK_LOOKUP }
877 struct mdc_enqueue_args *aa;
879 int flags = LDLM_FL_HAS_INTENT;
882 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
883 op_data->namelen, op_data->name, op_data->fid1.id,
884 ldlm_it2str(it->it_op), it->it_flags);
886 req = mdc_intent_lookup_pack(exp, it, op_data);
890 mdc_enter_request(&obddev->u.cli);
891 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
892 0, NULL, &minfo->mi_lockh, 1);
894 mdc_exit_request(&obddev->u.cli);
898 CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
899 aa = (struct mdc_enqueue_args *)&req->rq_async_args;
902 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
903 ptlrpcd_add_req(req);
907 EXPORT_SYMBOL(mdc_intent_getattr_async);