1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDC
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
36 # include <liblustre.h>
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
44 int it_disposition(struct lookup_intent *it, int flag)
46 return it->d.lustre.it_disposition & flag;
48 EXPORT_SYMBOL(it_disposition);
50 void it_set_disposition(struct lookup_intent *it, int flag)
52 it->d.lustre.it_disposition |= flag;
54 EXPORT_SYMBOL(it_set_disposition);
56 void it_clear_disposition(struct lookup_intent *it, int flag)
58 it->d.lustre.it_disposition &= ~flag;
60 EXPORT_SYMBOL(it_clear_disposition);
62 int it_open_error(int phase, struct lookup_intent *it)
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data)
101 struct ldlm_lock *lock;
102 struct lustre_handle *lockh = (struct lustre_handle *)l;
110 lock = ldlm_handle2lock(lockh);
112 LASSERT(lock != NULL);
113 lock_res_and_lock(lock);
115 if (lock->l_ast_data && lock->l_ast_data != data) {
116 struct inode *new_inode = data;
117 struct inode *old_inode = lock->l_ast_data;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_ast_data = data;
127 unlock_res_and_lock(lock);
132 EXPORT_SYMBOL(mdc_set_lock_data);
134 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
135 ldlm_iterator_t it, void *data)
137 struct ldlm_res_id res_id = { .name = {0} };
140 res_id.name[0] = fid->id;
141 res_id.name[1] = fid->generation;
143 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
150 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
152 /* Don't hold error requests for replay. */
153 if (req->rq_replay) {
154 spin_lock(&req->rq_lock);
156 spin_unlock(&req->rq_lock);
158 if (rc && req->rq_transno != 0) {
159 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
164 static int round_up(int val)
174 /* Save a large LOV EA into the request buffer so that it is available
175 * for replay. We don't do this in the initial request because the
176 * original request doesn't need this buffer (at most it sends just the
177 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
178 * buffer and may also be difficult to allocate and save a very large
179 * request buffer for each open. (bug 5707)
181 * OOM here may cause recovery failure if lmm is needed (only for the
182 * original open if the MDS crashed just when this client also OOM'd)
183 * but this is incredibly unlikely, and questionable whether the client
184 * could do MDS recovery under OOM anyways... */
185 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
186 struct mds_body *body)
188 int old_len, new_size, old_size;
189 struct lustre_msg *old_msg = req->rq_reqmsg;
190 struct lustre_msg *new_msg;
192 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
193 old_size = lustre_packed_msg_size(old_msg);
194 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
196 new_size = lustre_packed_msg_size(old_msg);
198 OBD_ALLOC(new_msg, new_size);
199 if (new_msg != NULL) {
200 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
202 memcpy(new_msg, old_msg, old_size);
204 spin_lock(&req->rq_lock);
205 req->rq_reqmsg = new_msg;
206 req->rq_reqlen = new_size;
207 spin_unlock(&req->rq_lock);
209 OBD_FREE(old_msg, old_size);
211 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
212 body->valid &= ~OBD_MD_FLEASIZE;
213 body->eadatasize = 0;
217 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
218 struct lookup_intent *it,
219 struct mdc_op_data *data,
220 void *lmm, int lmmsize)
222 struct ptlrpc_request *req;
223 struct ldlm_intent *lit;
224 struct obd_device *obddev = class_exp2obd(exp);
225 int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
226 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
227 [DLM_INTENT_IT_OFF] = sizeof(*lit),
228 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
229 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
230 /* As an optimization, we allocate an RPC request buffer
231 * for at least a default-sized LOV EA even if we aren't
232 * sending one. We grow the whole request to the next
233 * power-of-two size since we get that much from a slab
234 * allocation anyways. This avoids an allocation below
235 * in the common case where we need to save a
236 * default-sized LOV EA for open replay. */
237 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
238 obddev->u.cli.cl_default_mds_easize) };
239 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
240 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
241 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
242 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
244 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
245 CFS_LIST_HEAD(cancels);
246 int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
251 it->it_create_mode |= S_IFREG;
253 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
255 size[DLM_INTENT_REC_OFF + 2] =
256 min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
257 obddev->u.cli.cl_max_mds_easize);
259 /* If inode is known, cancel conflicting OPEN locks. */
261 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
264 else if (it->it_flags & FMODE_EXEC)
269 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
270 mode, MDS_INODELOCK_OPEN);
273 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
274 if (it->it_op & IT_CREAT || do_join)
278 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
279 MDS_INODELOCK_UPDATE);
281 __u64 head_size = (*(__u64 *)data->data);
282 /* join is like an unlink of the tail */
283 size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
284 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
286 mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data,
289 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
290 it->it_flags &= ~O_JOIN_FILE;
294 spin_lock(&req->rq_lock);
296 spin_unlock(&req->rq_lock);
298 /* pack the intent */
299 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
301 lit->opc = (__u64)it->it_op;
303 /* pack the intended request */
304 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
305 it->it_create_mode, 0, it->it_flags,
308 ptlrpc_req_set_repsize(req, 5, repsize);
313 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
314 struct lookup_intent *it,
315 struct mdc_op_data *data)
317 struct ptlrpc_request *req;
318 struct ldlm_intent *lit;
319 struct obd_device *obddev = class_exp2obd(exp);
320 int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
321 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
322 [DLM_INTENT_IT_OFF] = sizeof(*lit),
323 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink),
324 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
325 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
326 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
327 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
328 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
330 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
331 cl_max_mds_cookiesize };
333 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
335 /* pack the intent */
336 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
338 lit->opc = (__u64)it->it_op;
340 /* pack the intended request */
341 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
343 ptlrpc_req_set_repsize(req, 5, repsize);
348 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
349 struct lookup_intent *it,
350 struct mdc_op_data *data)
352 struct ptlrpc_request *req;
353 struct ldlm_intent *lit;
354 struct obd_device *obddev = class_exp2obd(exp);
355 int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
357 [DLM_INTENT_IT_OFF] = sizeof(*lit),
358 [DLM_INTENT_REC_OFF] = sizeof(struct mds_body),
359 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
360 int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
361 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
362 [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
363 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
365 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
366 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
367 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
369 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
371 /* pack the intent */
372 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
374 lit->opc = (__u64)it->it_op;
376 /* pack the intended request */
377 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
379 ptlrpc_req_set_repsize(req, 5, repsize);
384 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
386 struct ptlrpc_request *req;
387 int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
388 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
389 int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
390 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply) };
392 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
394 ptlrpc_req_set_repsize(req, 2, repsize);
398 static int mdc_finish_enqueue(struct obd_export *exp,
399 struct ptlrpc_request *req,
400 struct ldlm_enqueue_info *einfo,
401 struct lookup_intent *it,
402 struct lustre_handle *lockh,
405 struct ldlm_request *lockreq;
406 struct ldlm_reply *lockrep;
409 /* Similarly, if we're going to replay this request, we don't want to
410 * actually get a lock, just perform the intent. */
411 if (req->rq_transno || req->rq_replay) {
412 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
414 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
417 if (rc == ELDLM_LOCK_ABORTED) {
419 memset(lockh, 0, sizeof(*lockh));
421 } else if (rc != 0) {
422 CERROR("ldlm_cli_enqueue: %d\n", rc);
423 LASSERTF(rc < 0, "rc %d\n", rc);
424 mdc_clear_replay_flag(req, rc);
425 ptlrpc_req_finished(req);
427 } else { /* rc = 0 */
428 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
431 /* If the server gave us back a different lock mode, we should
432 * fix up our variables. */
433 if (lock->l_req_mode != einfo->ei_mode) {
434 ldlm_lock_addref(lockh, lock->l_req_mode);
435 ldlm_lock_decref(lockh, einfo->ei_mode);
436 einfo->ei_mode = lock->l_req_mode;
441 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
443 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
444 /* swabbed by ldlm_cli_enqueue() */
445 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
447 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
448 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
449 it->d.lustre.it_lock_mode = einfo->ei_mode;
450 it->d.lustre.it_data = req;
452 if (it->d.lustre.it_status < 0 && req->rq_replay)
453 mdc_clear_replay_flag(req, it->d.lustre.it_status);
455 /* If we're doing an IT_OPEN which did not result in an actual
456 * successful open, then we need to remove the bit which saves
457 * this request for unconditional replay.
459 * It's important that we do this first! Otherwise we might exit the
460 * function without doing so, and try to replay a failed create
462 if ((it->it_op & IT_OPEN) &&
464 (!it_disposition(it, DISP_OPEN_OPEN) ||
465 it->d.lustre.it_status != 0))
466 mdc_clear_replay_flag(req, it->d.lustre.it_status);
468 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
469 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
471 /* We know what to expect, so we do any byte flipping required here */
472 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
473 struct mds_body *body;
475 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
476 lustre_swab_mds_body);
478 CERROR ("Can't swab mds_body\n");
482 /* If this is a successful OPEN request, we need to set
483 replay handler and data early, so that if replay happens
484 immediately after swabbing below, new reply is swabbed
485 by that handler correctly */
486 if (it_disposition(it, DISP_OPEN_OPEN) &&
487 !it_open_error(DISP_OPEN_OPEN, it))
488 mdc_set_open_replay_data(NULL, req);
490 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
493 /* The eadata is opaque; just check that it is there.
494 * Eventually, obd_unpackmd() will check the contents */
495 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
496 body->eadatasize, NULL);
497 if (eadata == NULL) {
498 CERROR ("Missing/short eadata\n");
501 if (body->valid & OBD_MD_FLMODEASIZE) {
502 struct obd_device *obddev = class_exp2obd(exp);
504 if (obddev->u.cli.cl_max_mds_easize <
506 obddev->u.cli.cl_max_mds_easize =
508 CDEBUG(D_INFO, "maxeasize become %d\n",
511 if (obddev->u.cli.cl_max_mds_cookiesize <
512 body->max_cookiesize) {
513 obddev->u.cli.cl_max_mds_cookiesize =
514 body->max_cookiesize;
515 CDEBUG(D_INFO, "cookiesize become %d\n",
516 body->max_cookiesize);
519 /* We save the reply LOV EA in case we have to replay
520 * a create for recovery. If we didn't allocate a
521 * large enough request buffer above we need to
522 * reallocate it here to hold the actual LOV EA. */
523 if (it->it_op & IT_OPEN) {
524 int offset = DLM_INTENT_REC_OFF + 2;
527 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
529 mdc_realloc_openmsg(req, body);
531 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
534 memcpy(lmm, eadata, body->eadatasize);
542 /* We always reserve enough space in the reply packet for a stripe MD, because
543 * we don't know in advance the file type. */
544 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
545 struct lookup_intent *it, struct mdc_op_data *data,
546 struct lustre_handle *lockh, void *lmm, int lmmsize,
547 int extra_lock_flags)
549 struct ptlrpc_request *req;
550 struct obd_device *obddev = class_exp2obd(exp);
551 struct ldlm_res_id res_id =
552 { .name = {data->fid1.id, data->fid1.generation} };
553 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
554 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
558 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
559 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
560 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
562 if (it->it_op & IT_OPEN) {
563 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
564 if (it->it_flags & O_JOIN_FILE) {
565 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
567 } else if (it->it_op & IT_UNLINK) {
568 req = mdc_intent_unlink_pack(exp, it, data);
569 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
570 req = mdc_intent_lookup_pack(exp, it, data);
571 } else if (it->it_op == IT_READDIR) {
572 req = mdc_intent_readdir_pack(exp);
574 CERROR("bad it_op %x\n", it->it_op);
581 /* It is important to obtain rpc_lock first (if applicable), so that
582 * threads that are serialised with rpc_lock are not polluting our
583 * rpcs in flight counter */
584 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
585 mdc_enter_request(&obddev->u.cli);
586 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
588 mdc_exit_request(&obddev->u.cli);
589 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
591 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
595 EXPORT_SYMBOL(mdc_enqueue);
597 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
600 /* We could just return 1 immediately, but since we should only
601 * be called in revalidate_it if we already have a lock, let's
603 struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
604 struct lustre_handle lockh;
605 ldlm_policy_data_t policy;
608 /* As not all attributes are kept under update lock, e.g.
609 owner/group/acls are under lookup lock, we need both
610 ibits for GETATTR. */
611 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
612 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
613 MDS_INODELOCK_LOOKUP;
615 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
616 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
617 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
619 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
620 it->d.lustre.it_lock_mode = mode;
625 EXPORT_SYMBOL(mdc_revalidate_lock);
627 static int mdc_finish_intent_lock(struct obd_export *exp,
628 struct ptlrpc_request *req,
629 struct mdc_op_data *data,
630 struct lookup_intent *it,
631 struct lustre_handle *lockh)
633 struct mds_body *mds_body;
634 struct lustre_handle old_lock;
635 struct ldlm_lock *lock;
639 LASSERT(req != NULL);
640 LASSERT(req != LP_POISON);
641 LASSERT(req->rq_repmsg != LP_POISON);
643 if (!it_disposition(it, DISP_IT_EXECD)) {
644 /* The server failed before it even started executing the
645 * intent, i.e. because it couldn't unpack the request. */
646 LASSERT(it->d.lustre.it_status != 0);
647 RETURN(it->d.lustre.it_status);
649 rc = it_open_error(DISP_IT_EXECD, it);
653 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
655 /* mdc_enqueue checked */
656 LASSERT(mds_body != NULL);
657 /* mdc_enqueue swabbed */
658 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
660 /* If we were revalidating a fid/name pair, mark the intent in
661 * case we fail and get called again from lookup */
662 if (data->fid2.id && (it->it_op != IT_GETATTR)) {
663 it_set_disposition(it, DISP_ENQ_COMPLETE);
664 /* Also: did we find the same inode? */
665 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)))
669 rc = it_open_error(DISP_LOOKUP_EXECD, it);
673 /* keep requests around for the multiple phases of the call
674 * this shows the DISP_XX must guarantee we make it into the call
676 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
677 it_disposition(it, DISP_OPEN_CREATE) &&
678 !it_open_error(DISP_OPEN_CREATE, it)) {
679 it_set_disposition(it, DISP_ENQ_CREATE_REF);
680 ptlrpc_request_addref(req); /* balanced in ll_create_node */
682 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
683 it_disposition(it, DISP_OPEN_OPEN) &&
684 !it_open_error(DISP_OPEN_OPEN, it)) {
685 it_set_disposition(it, DISP_ENQ_OPEN_REF);
686 ptlrpc_request_addref(req); /* balanced in ll_file_open */
687 /* BUG 11546 - eviction in the middle of open rpc processing */
688 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
691 if (it->it_op & IT_CREAT) {
692 /* XXX this belongs in ll_create_it */
693 } else if (it->it_op == IT_OPEN) {
694 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
696 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
699 /* If we already have a matching lock, then cancel the new
700 * one. We have to set the data here instead of in
701 * mdc_enqueue, because we need to use the child's inode as
702 * the l_ast_data to match, and that's not available until
703 * intent_finish has performed the iget().) */
704 lock = ldlm_handle2lock(lockh);
706 ldlm_policy_data_t policy = lock->l_policy_data;
708 LDLM_DEBUG(lock, "matching against this");
710 memcpy(&old_lock, lockh, sizeof(*lockh));
711 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
712 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
713 ldlm_lock_decref_and_cancel(lockh,
714 it->d.lustre.it_lock_mode);
715 memcpy(lockh, &old_lock, sizeof(old_lock));
716 memcpy(&it->d.lustre.it_lock_handle, lockh,
721 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
722 data->namelen, data->name, ldlm_it2str(it->it_op),
723 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
728 * This long block is all about fixing up the lock and request state
729 * so that it is correct as of the moment _before_ the operation was
730 * applied; that way, the VFS will think that everything is normal and
731 * call Lustre's regular VFS methods.
733 * If we're performing a creation, that means that unless the creation
734 * failed with EEXIST, we should fake up a negative dentry.
736 * For everything else, we want to lookup to succeed.
738 * One additional note: if CREATE or OPEN succeeded, we add an extra
739 * reference to the request because we need to keep it around until
740 * ll_create/ll_open gets called.
742 * The server will return to us, in it_disposition, an indication of
743 * exactly what d.lustre.it_status refers to.
745 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
746 * otherwise if DISP_OPEN_CREATE is set, then it status is the
747 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
748 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
751 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
754 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
755 void *lmm, int lmmsize, struct lookup_intent *it,
756 int lookup_flags, struct ptlrpc_request **reqp,
757 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
759 struct lustre_handle lockh;
765 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
766 op_data->namelen, op_data->name, op_data->fid1.id,
767 ldlm_it2str(it->it_op), it->it_flags);
769 if (op_data->fid2.id &&
770 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
771 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
772 /* Only return failure if it was not GETATTR by cfid
773 (from inode_revalidate) */
774 if (rc || op_data->namelen != 0)
778 /* lookup_it may be called only after revalidate_it has run, because
779 * revalidate_it cannot return errors, only zero. Returning zero causes
780 * this call to lookup, which *can* return an error.
782 * We only want to execute the request associated with the intent one
783 * time, however, so don't send the request again. Instead, skip past
784 * this and use the request from revalidate. In this case, revalidate
785 * never dropped its reference, so the refcounts are all OK */
786 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
787 struct ldlm_enqueue_info einfo =
788 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
789 ldlm_completion_ast, NULL, NULL };
791 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
792 lmm, lmmsize, extra_lock_flags);
795 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
796 } else if (!op_data->fid2.id) {
797 /* DISP_ENQ_COMPLETE set means there is extra reference on
798 * request referenced from this intent, saved for subsequent
799 * lookup. This path is executed when we proceed to this
800 * lookup, so we clear DISP_ENQ_COMPLETE */
801 it_clear_disposition(it, DISP_ENQ_COMPLETE);
804 *reqp = it->d.lustre.it_data;
805 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
809 EXPORT_SYMBOL(mdc_intent_lock);
811 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
812 void *unused, int rc)
814 struct mdc_enqueue_args *ma;
815 struct md_enqueue_info *minfo;
816 struct ldlm_enqueue_info *einfo;
817 struct obd_export *exp;
818 struct lookup_intent *it;
819 struct lustre_handle *lockh;
820 struct obd_device *obddev;
821 int flags = LDLM_FL_HAS_INTENT;
824 ma = (struct mdc_enqueue_args *)&req->rq_async_args;
830 lockh = &minfo->mi_lockh;
832 obddev = class_exp2obd(exp);
834 mdc_exit_request(&obddev->u.cli);
836 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
837 &flags, NULL, 0, NULL, lockh, rc);
839 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
843 memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
845 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
849 minfo->mi_cb(exp, req, minfo, rc);
854 int mdc_intent_getattr_async(struct obd_export *exp,
855 struct md_enqueue_info *minfo,
856 struct ldlm_enqueue_info *einfo)
858 struct mdc_op_data *op_data = &minfo->mi_data;
859 struct lookup_intent *it = &minfo->mi_it;
860 struct ptlrpc_request *req;
861 struct obd_device *obddev = class_exp2obd(exp);
862 struct ldlm_res_id res_id = {
863 .name = {op_data->fid1.id,
864 op_data->fid1.generation}
866 ldlm_policy_data_t policy = {
867 .l_inodebits = { MDS_INODELOCK_LOOKUP }
869 struct mdc_enqueue_args *aa;
871 int flags = LDLM_FL_HAS_INTENT;
874 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
875 op_data->namelen, op_data->name, op_data->fid1.id,
876 ldlm_it2str(it->it_op), it->it_flags);
878 req = mdc_intent_lookup_pack(exp, it, op_data);
882 mdc_enter_request(&obddev->u.cli);
883 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
884 0, NULL, &minfo->mi_lockh, 1);
886 mdc_exit_request(&obddev->u.cli);
890 CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
891 aa = (struct mdc_enqueue_args *)&req->rq_async_args;
894 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
895 ptlrpcd_add_req(req);
899 EXPORT_SYMBOL(mdc_intent_getattr_async);