1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDC
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
36 # include <liblustre.h>
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
44 int it_disposition(struct lookup_intent *it, int flag)
46 return it->d.lustre.it_disposition & flag;
48 EXPORT_SYMBOL(it_disposition);
50 void it_set_disposition(struct lookup_intent *it, int flag)
52 it->d.lustre.it_disposition |= flag;
54 EXPORT_SYMBOL(it_set_disposition);
56 void it_clear_disposition(struct lookup_intent *it, int flag)
58 it->d.lustre.it_disposition &= ~flag;
60 EXPORT_SYMBOL(it_clear_disposition);
62 int it_open_error(int phase, struct lookup_intent *it)
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data)
101 struct ldlm_lock *lock;
102 struct lustre_handle *lockh = (struct lustre_handle *)l;
110 lock = ldlm_handle2lock(lockh);
112 LASSERT(lock != NULL);
113 lock_res_and_lock(lock);
115 if (lock->l_ast_data && lock->l_ast_data != data) {
116 struct inode *new_inode = data;
117 struct inode *old_inode = lock->l_ast_data;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_ast_data = data;
127 unlock_res_and_lock(lock);
132 EXPORT_SYMBOL(mdc_set_lock_data);
134 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
135 ldlm_iterator_t it, void *data)
137 struct ldlm_res_id res_id = { .name = {0} };
140 res_id.name[0] = fid->id;
141 res_id.name[1] = fid->generation;
143 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
150 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
152 /* Don't hold error requests for replay. */
153 if (req->rq_replay) {
154 spin_lock(&req->rq_lock);
156 spin_unlock(&req->rq_lock);
158 if (rc && req->rq_transno != 0) {
159 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
164 static int round_up(int val)
174 /* Save a large LOV EA into the request buffer so that it is available
175 * for replay. We don't do this in the initial request because the
176 * original request doesn't need this buffer (at most it sends just the
177 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
178 * buffer and may also be difficult to allocate and save a very large
179 * request buffer for each open. (bug 5707)
181 * OOM here may cause recovery failure if lmm is needed (only for the
182 * original open if the MDS crashed just when this client also OOM'd)
183 * but this is incredibly unlikely, and questionable whether the client
184 * could do MDS recovery under OOM anyways... */
185 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
186 struct mds_body *body)
188 int old_len, new_size, old_size;
189 struct lustre_msg *old_msg = req->rq_reqmsg;
190 struct lustre_msg *new_msg;
192 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
194 old_size = lustre_msg_size(lustre_request_magic(req),
195 req->rq_reqmsg->lm_bufcount,
196 req->rq_reqmsg->lm_buflens);
198 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
200 new_size = lustre_msg_size(lustre_request_magic(req),
201 req->rq_reqmsg->lm_bufcount,
202 req->rq_reqmsg->lm_buflens);
203 OBD_ALLOC(new_msg, new_size);
204 if (new_msg != NULL) {
205 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
207 memcpy(new_msg, old_msg, old_size);
209 spin_lock(&req->rq_lock);
210 req->rq_reqmsg = new_msg;
211 req->rq_reqlen = new_size;
212 spin_unlock(&req->rq_lock);
214 OBD_FREE(old_msg, old_size);
216 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
217 body->valid &= ~OBD_MD_FLEASIZE;
218 body->eadatasize = 0;
222 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
223 struct lookup_intent *it,
224 struct mdc_op_data *data,
225 void *lmm, int lmmsize)
227 struct ptlrpc_request *req;
228 struct ldlm_intent *lit;
229 struct obd_device *obddev = class_exp2obd(exp);
230 int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
231 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
232 [DLM_INTENT_IT_OFF] = sizeof(*lit),
233 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
234 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
235 /* As an optimization, we allocate an RPC request buffer
236 * for at least a default-sized LOV EA even if we aren't
237 * sending one. We grow the whole request to the next
238 * power-of-two size since we get that much from a slab
239 * allocation anyways. This avoids an allocation below
240 * in the common case where we need to save a
241 * default-sized LOV EA for open replay. */
242 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
243 obddev->u.cli.cl_default_mds_easize) };
244 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
245 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
246 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
247 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
249 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
250 CFS_LIST_HEAD(cancels);
255 it->it_create_mode |= S_IFREG;
257 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
259 size[DLM_INTENT_REC_OFF + 2] =
260 min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
261 obddev->u.cli.cl_max_mds_easize);
263 /* If inode is known, cancel conflicting OPEN locks. */
265 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
268 else if (it->it_flags & FMODE_EXEC)
273 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
274 mode, MDS_INODELOCK_OPEN);
277 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
278 if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
282 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
283 MDS_INODELOCK_UPDATE);
284 if (it->it_flags & O_JOIN_FILE) {
285 __u64 head_size = (*(__u64 *)data->data);
286 /* join is like an unlink of the tail */
287 size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
288 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
289 mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, head_size);
291 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
295 spin_lock(&req->rq_lock);
297 spin_unlock(&req->rq_lock);
299 /* pack the intent */
300 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
302 lit->opc = (__u64)it->it_op;
304 /* pack the intended request */
305 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
306 it->it_create_mode, 0, it->it_flags,
309 ptlrpc_req_set_repsize(req, 5, repsize);
314 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
315 struct lookup_intent *it,
316 struct mdc_op_data *data)
318 struct ptlrpc_request *req;
319 struct ldlm_intent *lit;
320 struct obd_device *obddev = class_exp2obd(exp);
321 int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
322 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
323 [DLM_INTENT_IT_OFF] = sizeof(*lit),
324 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink),
325 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
326 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
327 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
328 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
329 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
331 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
332 cl_max_mds_cookiesize };
334 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
336 /* pack the intent */
337 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
339 lit->opc = (__u64)it->it_op;
341 /* pack the intended request */
342 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
344 ptlrpc_req_set_repsize(req, 5, repsize);
349 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
350 struct lookup_intent *it,
351 struct mdc_op_data *data)
353 struct ptlrpc_request *req;
354 struct ldlm_intent *lit;
355 struct obd_device *obddev = class_exp2obd(exp);
356 int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
357 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
358 [DLM_INTENT_IT_OFF] = sizeof(*lit),
359 [DLM_INTENT_REC_OFF] = sizeof(struct mds_body),
360 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
361 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
362 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
363 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
364 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
366 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
367 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
368 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
370 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
372 /* pack the intent */
373 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
375 lit->opc = (__u64)it->it_op;
377 /* pack the intended request */
378 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
380 ptlrpc_req_set_repsize(req, 5, repsize);
385 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
387 struct ptlrpc_request *req;
388 int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
389 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
390 int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
391 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply) };
393 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
395 ptlrpc_req_set_repsize(req, 2, repsize);
399 static int mdc_finish_enqueue(struct obd_export *exp,
400 struct ptlrpc_request *req,
401 struct ldlm_enqueue_info *einfo,
402 struct lookup_intent *it,
403 struct lustre_handle *lockh,
406 struct ldlm_request *lockreq;
407 struct ldlm_reply *lockrep;
410 /* Similarly, if we're going to replay this request, we don't want to
411 * actually get a lock, just perform the intent. */
412 if (req->rq_transno || req->rq_replay) {
413 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
415 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
418 if (rc == ELDLM_LOCK_ABORTED) {
420 memset(lockh, 0, sizeof(*lockh));
422 } else if (rc != 0) {
423 CERROR("ldlm_cli_enqueue: %d\n", rc);
424 LASSERTF(rc < 0, "rc %d\n", rc);
425 mdc_clear_replay_flag(req, rc);
426 ptlrpc_req_finished(req);
428 } else { /* rc = 0 */
429 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
432 /* If the server gave us back a different lock mode, we should
433 * fix up our variables. */
434 if (lock->l_req_mode != einfo->ei_mode) {
435 ldlm_lock_addref(lockh, lock->l_req_mode);
436 ldlm_lock_decref(lockh, einfo->ei_mode);
437 einfo->ei_mode = lock->l_req_mode;
442 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
444 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
445 /* swabbed by ldlm_cli_enqueue() */
446 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
448 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
449 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
450 it->d.lustre.it_lock_mode = einfo->ei_mode;
451 it->d.lustre.it_data = req;
453 if (it->d.lustre.it_status < 0 && req->rq_replay)
454 mdc_clear_replay_flag(req, it->d.lustre.it_status);
456 /* If we're doing an IT_OPEN which did not result in an actual
457 * successful open, then we need to remove the bit which saves
458 * this request for unconditional replay.
460 * It's important that we do this first! Otherwise we might exit the
461 * function without doing so, and try to replay a failed create
463 if ((it->it_op & IT_OPEN) &&
465 (!it_disposition(it, DISP_OPEN_OPEN) ||
466 it->d.lustre.it_status != 0))
467 mdc_clear_replay_flag(req, it->d.lustre.it_status);
469 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
470 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
472 /* We know what to expect, so we do any byte flipping required here */
473 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
474 struct mds_body *body;
476 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
477 lustre_swab_mds_body);
479 CERROR ("Can't swab mds_body\n");
483 /* If this is a successful OPEN request, we need to set
484 replay handler and data early, so that if replay happens
485 immediately after swabbing below, new reply is swabbed
486 by that handler correctly */
487 if (it_disposition(it, DISP_OPEN_OPEN) &&
488 !it_open_error(DISP_OPEN_OPEN, it))
489 mdc_set_open_replay_data(NULL, req);
491 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
494 /* The eadata is opaque; just check that it is there.
495 * Eventually, obd_unpackmd() will check the contents */
496 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
497 body->eadatasize, NULL);
498 if (eadata == NULL) {
499 CERROR ("Missing/short eadata\n");
502 if (body->valid & OBD_MD_FLMODEASIZE) {
503 struct obd_device *obddev = class_exp2obd(exp);
505 if (obddev->u.cli.cl_max_mds_easize <
507 obddev->u.cli.cl_max_mds_easize =
509 CDEBUG(D_INFO, "maxeasize become %d\n",
512 if (obddev->u.cli.cl_max_mds_cookiesize <
513 body->max_cookiesize) {
514 obddev->u.cli.cl_max_mds_cookiesize =
515 body->max_cookiesize;
516 CDEBUG(D_INFO, "cookiesize become %d\n",
517 body->max_cookiesize);
520 /* We save the reply LOV EA in case we have to replay
521 * a create for recovery. If we didn't allocate a
522 * large enough request buffer above we need to
523 * reallocate it here to hold the actual LOV EA. */
524 if (it->it_op & IT_OPEN) {
525 int offset = DLM_INTENT_REC_OFF + 2;
528 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
530 mdc_realloc_openmsg(req, body);
532 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
535 memcpy(lmm, eadata, body->eadatasize);
543 /* We always reserve enough space in the reply packet for a stripe MD, because
544 * we don't know in advance the file type. */
545 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
546 struct lookup_intent *it, struct mdc_op_data *data,
547 struct lustre_handle *lockh, void *lmm, int lmmsize,
548 int extra_lock_flags)
550 struct ptlrpc_request *req;
551 struct obd_device *obddev = class_exp2obd(exp);
552 struct ldlm_res_id res_id =
553 { .name = {data->fid1.id, data->fid1.generation} };
554 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
555 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
559 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
560 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
561 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
563 if (it->it_op & IT_OPEN) {
564 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
565 if (it->it_flags & O_JOIN_FILE) {
566 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
568 } else if (it->it_op & IT_UNLINK) {
569 req = mdc_intent_unlink_pack(exp, it, data);
570 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
571 req = mdc_intent_lookup_pack(exp, it, data);
572 } else if (it->it_op == IT_READDIR) {
573 req = mdc_intent_readdir_pack(exp);
575 CERROR("bad it_op %x\n", it->it_op);
582 /* It is important to obtain rpc_lock first (if applicable), so that
583 * threads that are serialised with rpc_lock are not polluting our
584 * rpcs in flight counter */
585 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
586 mdc_enter_request(&obddev->u.cli);
587 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
589 mdc_exit_request(&obddev->u.cli);
590 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
592 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
596 EXPORT_SYMBOL(mdc_enqueue);
598 int mdc_revalidate_lock(struct obd_export *exp,
599 struct lookup_intent *it,
602 /* We could just return 1 immediately, but since we should only
603 * be called in revalidate_it if we already have a lock, let's
605 struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
606 struct lustre_handle lockh;
607 ldlm_policy_data_t policy;
611 /* As not all attributes are kept under update lock, e.g.
612 owner/group/acls are under lookup lock, we need both
613 ibits for GETATTR. */
614 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
615 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
616 MDS_INODELOCK_LOOKUP;
618 rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
619 &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
622 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
623 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
624 &policy, LCK_CW, &lockh);
628 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
629 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
630 &policy, LCK_PR, &lockh);
633 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
634 it->d.lustre.it_lock_mode = mode;
639 EXPORT_SYMBOL(mdc_revalidate_lock);
641 static int mdc_finish_intent_lock(struct obd_export *exp,
642 struct ptlrpc_request *req,
643 struct mdc_op_data *data,
644 struct lookup_intent *it,
645 struct lustre_handle *lockh)
647 struct mds_body *mds_body;
648 struct lustre_handle old_lock;
649 struct ldlm_lock *lock;
653 LASSERT(req != NULL);
654 LASSERT(req != LP_POISON);
655 LASSERT(req->rq_repmsg != LP_POISON);
657 if (!it_disposition(it, DISP_IT_EXECD)) {
658 /* The server failed before it even started executing the
659 * intent, i.e. because it couldn't unpack the request. */
660 LASSERT(it->d.lustre.it_status != 0);
661 RETURN(it->d.lustre.it_status);
663 rc = it_open_error(DISP_IT_EXECD, it);
667 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
669 /* mdc_enqueue checked */
670 LASSERT(mds_body != NULL);
671 /* mdc_enqueue swabbed */
672 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
674 /* If we were revalidating a fid/name pair, mark the intent in
675 * case we fail and get called again from lookup */
676 if (data->fid2.id && (it->it_op != IT_GETATTR)) {
677 it_set_disposition(it, DISP_ENQ_COMPLETE);
678 /* Also: did we find the same inode? */
679 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)))
683 rc = it_open_error(DISP_LOOKUP_EXECD, it);
687 /* keep requests around for the multiple phases of the call
688 * this shows the DISP_XX must guarantee we make it into the call
690 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
691 it_disposition(it, DISP_OPEN_CREATE) &&
692 !it_open_error(DISP_OPEN_CREATE, it)) {
693 it_set_disposition(it, DISP_ENQ_CREATE_REF);
694 ptlrpc_request_addref(req); /* balanced in ll_create_node */
696 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
697 it_disposition(it, DISP_OPEN_OPEN) &&
698 !it_open_error(DISP_OPEN_OPEN, it)) {
699 it_set_disposition(it, DISP_ENQ_OPEN_REF);
700 ptlrpc_request_addref(req); /* balanced in ll_file_open */
701 /* BUG 11546 - eviction in the middle of open rpc processing */
702 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
705 if (it->it_op & IT_CREAT) {
706 /* XXX this belongs in ll_create_it */
707 } else if (it->it_op == IT_OPEN) {
708 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
710 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
713 /* If we already have a matching lock, then cancel the new
714 * one. We have to set the data here instead of in
715 * mdc_enqueue, because we need to use the child's inode as
716 * the l_ast_data to match, and that's not available until
717 * intent_finish has performed the iget().) */
718 lock = ldlm_handle2lock(lockh);
720 ldlm_policy_data_t policy = lock->l_policy_data;
722 LDLM_DEBUG(lock, "matching against this");
724 memcpy(&old_lock, lockh, sizeof(*lockh));
725 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
726 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
727 ldlm_lock_decref_and_cancel(lockh,
728 it->d.lustre.it_lock_mode);
729 memcpy(lockh, &old_lock, sizeof(old_lock));
730 memcpy(&it->d.lustre.it_lock_handle, lockh,
735 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
736 data->namelen, data->name, ldlm_it2str(it->it_op),
737 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
742 * This long block is all about fixing up the lock and request state
743 * so that it is correct as of the moment _before_ the operation was
744 * applied; that way, the VFS will think that everything is normal and
745 * call Lustre's regular VFS methods.
747 * If we're performing a creation, that means that unless the creation
748 * failed with EEXIST, we should fake up a negative dentry.
750 * For everything else, we want to lookup to succeed.
752 * One additional note: if CREATE or OPEN succeeded, we add an extra
753 * reference to the request because we need to keep it around until
754 * ll_create/ll_open gets called.
756 * The server will return to us, in it_disposition, an indication of
757 * exactly what d.lustre.it_status refers to.
759 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
760 * otherwise if DISP_OPEN_CREATE is set, then it status is the
761 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
762 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
765 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
768 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
769 void *lmm, int lmmsize, struct lookup_intent *it,
770 int lookup_flags, struct ptlrpc_request **reqp,
771 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
773 struct lustre_handle lockh;
779 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
780 op_data->namelen, op_data->name, op_data->fid1.id,
781 ldlm_it2str(it->it_op), it->it_flags);
783 if (op_data->fid2.id &&
784 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
785 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
786 /* Only return failure if it was not GETATTR by cfid
787 (from inode_revalidate) */
788 if (rc || op_data->namelen != 0)
792 /* lookup_it may be called only after revalidate_it has run, because
793 * revalidate_it cannot return errors, only zero. Returning zero causes
794 * this call to lookup, which *can* return an error.
796 * We only want to execute the request associated with the intent one
797 * time, however, so don't send the request again. Instead, skip past
798 * this and use the request from revalidate. In this case, revalidate
799 * never dropped its reference, so the refcounts are all OK */
800 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
801 struct ldlm_enqueue_info einfo =
802 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
803 ldlm_completion_ast, NULL, NULL };
805 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
806 lmm, lmmsize, extra_lock_flags);
809 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
810 } else if (!op_data->fid2.id) {
811 /* DISP_ENQ_COMPLETE set means there is extra reference on
812 * request referenced from this intent, saved for subsequent
813 * lookup. This path is executed when we proceed to this
814 * lookup, so we clear DISP_ENQ_COMPLETE */
815 it_clear_disposition(it, DISP_ENQ_COMPLETE);
818 *reqp = it->d.lustre.it_data;
819 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
823 EXPORT_SYMBOL(mdc_intent_lock);
825 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
826 void *unused, int rc)
828 struct mdc_enqueue_args *ma;
829 struct md_enqueue_info *minfo;
830 struct ldlm_enqueue_info *einfo;
831 struct obd_export *exp;
832 struct lookup_intent *it;
833 struct lustre_handle *lockh;
834 struct obd_device *obddev;
835 int flags = LDLM_FL_HAS_INTENT;
838 ma = (struct mdc_enqueue_args *)&req->rq_async_args;
844 lockh = &minfo->mi_lockh;
846 obddev = class_exp2obd(exp);
848 mdc_exit_request(&obddev->u.cli);
850 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
851 &flags, NULL, 0, NULL, lockh, rc);
853 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
857 memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
859 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
863 minfo->mi_cb(exp, req, minfo, rc);
868 int mdc_intent_getattr_async(struct obd_export *exp,
869 struct md_enqueue_info *minfo,
870 struct ldlm_enqueue_info *einfo)
872 struct mdc_op_data *op_data = &minfo->mi_data;
873 struct lookup_intent *it = &minfo->mi_it;
874 struct ptlrpc_request *req;
875 struct obd_device *obddev = class_exp2obd(exp);
876 struct ldlm_res_id res_id = {
877 .name = {op_data->fid1.id,
878 op_data->fid1.generation}
880 ldlm_policy_data_t policy = {
881 .l_inodebits = { MDS_INODELOCK_LOOKUP }
883 struct mdc_enqueue_args *aa;
885 int flags = LDLM_FL_HAS_INTENT;
888 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
889 op_data->namelen, op_data->name, op_data->fid1.id,
890 ldlm_it2str(it->it_op), it->it_flags);
892 req = mdc_intent_lookup_pack(exp, it, op_data);
896 mdc_enter_request(&obddev->u.cli);
897 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
898 0, NULL, &minfo->mi_lockh, 1);
900 mdc_exit_request(&obddev->u.cli);
904 CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
905 aa = (struct mdc_enqueue_args *)&req->rq_async_args;
908 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
909 ptlrpcd_add_req(req);
913 EXPORT_SYMBOL(mdc_intent_getattr_async);