1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 struct mdc_getattr_args {
60 struct obd_export *ga_exp;
61 struct md_enqueue_info *ga_minfo;
62 struct ldlm_enqueue_info *ga_einfo;
65 int it_disposition(struct lookup_intent *it, int flag)
67 return it->d.lustre.it_disposition & flag;
69 EXPORT_SYMBOL(it_disposition);
71 void it_set_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition |= flag;
75 EXPORT_SYMBOL(it_set_disposition);
77 void it_clear_disposition(struct lookup_intent *it, int flag)
79 it->d.lustre.it_disposition &= ~flag;
81 EXPORT_SYMBOL(it_clear_disposition);
83 int it_open_error(int phase, struct lookup_intent *it)
85 if (it_disposition(it, DISP_OPEN_OPEN)) {
86 if (phase >= DISP_OPEN_OPEN)
87 return it->d.lustre.it_status;
92 if (it_disposition(it, DISP_OPEN_CREATE)) {
93 if (phase >= DISP_OPEN_CREATE)
94 return it->d.lustre.it_status;
99 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
100 if (phase >= DISP_LOOKUP_EXECD)
101 return it->d.lustre.it_status;
106 if (it_disposition(it, DISP_IT_EXECD)) {
107 if (phase >= DISP_IT_EXECD)
108 return it->d.lustre.it_status;
112 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
113 it->d.lustre.it_status);
117 EXPORT_SYMBOL(it_open_error);
119 /* this must be called on a lockh that is known to have a referenced lock */
120 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
123 struct ldlm_lock *lock;
134 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
136 LASSERT(lock != NULL);
137 lock_res_and_lock(lock);
139 if (lock->l_ast_data && lock->l_ast_data != data) {
140 struct inode *new_inode = data;
141 struct inode *old_inode = lock->l_ast_data;
142 LASSERTF(old_inode->i_state & I_FREEING,
143 "Found existing inode %p/%lu/%u state %lu in lock: "
144 "setting data to %p/%lu/%u\n", old_inode,
145 old_inode->i_ino, old_inode->i_generation,
147 new_inode, new_inode->i_ino, new_inode->i_generation);
150 lock->l_ast_data = data;
152 *bits = lock->l_policy_data.l_inodebits.bits;
154 unlock_res_and_lock(lock);
160 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
161 const struct lu_fid *fid, ldlm_type_t type,
162 ldlm_policy_data_t *policy, ldlm_mode_t mode,
163 struct lustre_handle *lockh)
165 struct ldlm_res_id res_id;
169 fid_build_reg_res_name(fid, &res_id);
170 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
171 &res_id, type, policy, mode, lockh, 0);
175 int mdc_cancel_unused(struct obd_export *exp,
176 const struct lu_fid *fid,
177 ldlm_policy_data_t *policy,
179 ldlm_cancel_flags_t flags,
182 struct ldlm_res_id res_id;
183 struct obd_device *obd = class_exp2obd(exp);
188 fid_build_reg_res_name(fid, &res_id);
189 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
190 policy, mode, flags, opaque);
194 int mdc_change_cbdata(struct obd_export *exp,
195 const struct lu_fid *fid,
196 ldlm_iterator_t it, void *data)
198 struct ldlm_res_id res_id;
201 fid_build_reg_res_name(fid, &res_id);
202 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
209 /* find any ldlm lock of the inode in mdc
213 int mdc_find_cbdata(struct obd_export *exp,
214 const struct lu_fid *fid,
215 ldlm_iterator_t it, void *data)
217 struct ldlm_res_id res_id;
221 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
224 if (rc == LDLM_ITER_STOP)
226 else if (rc == LDLM_ITER_CONTINUE)
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 cfs_spin_lock(&req->rq_lock);
237 cfs_spin_unlock(&req->rq_lock);
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
245 /* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
281 CFS_LIST_HEAD(cancels);
287 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
289 /* XXX: openlock is not cancelled for cross-refs. */
290 /* If inode is known, cancel conflicting OPEN locks. */
291 if (fid_is_sane(&op_data->op_fid2)) {
292 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
295 else if (it->it_flags & FMODE_EXEC)
300 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 /* If CREATE, cancel parent's UPDATE lock. */
306 if (it->it_op & IT_CREAT)
310 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
312 MDS_INODELOCK_UPDATE);
314 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
315 &RQF_LDLM_INTENT_OPEN);
317 ldlm_lock_list_put(&cancels, l_bl_ast, count);
318 RETURN(ERR_PTR(-ENOMEM));
321 /* parent capability */
322 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
323 /* child capability, reserve the size according to parent capa, it will
324 * be filled after we get the reply */
325 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
327 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
328 op_data->op_namelen + 1);
329 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
330 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
332 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334 ptlrpc_request_free(req);
338 cfs_spin_lock(&req->rq_lock);
339 req->rq_replay = req->rq_import->imp_replayable;
340 cfs_spin_unlock(&req->rq_lock);
342 /* pack the intent */
343 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344 lit->opc = (__u64)it->it_op;
346 /* pack the intended request */
347 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
350 /* for remote client, fetch remote perm for current user */
351 if (client_is_remote(exp))
352 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
353 sizeof(struct mdt_remote_perm));
354 ptlrpc_request_set_replen(req);
358 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
359 struct lookup_intent *it,
360 struct md_op_data *op_data)
362 struct ptlrpc_request *req;
363 struct obd_device *obddev = class_exp2obd(exp);
364 struct ldlm_intent *lit;
368 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369 &RQF_LDLM_INTENT_UNLINK);
371 RETURN(ERR_PTR(-ENOMEM));
373 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
374 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
375 op_data->op_namelen + 1);
377 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
379 ptlrpc_request_free(req);
383 /* pack the intent */
384 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
385 lit->opc = (__u64)it->it_op;
387 /* pack the intended request */
388 mdc_unlink_pack(req, op_data);
390 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
391 obddev->u.cli.cl_max_mds_easize);
392 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
393 obddev->u.cli.cl_max_mds_cookiesize);
394 ptlrpc_request_set_replen(req);
398 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
399 struct lookup_intent *it,
400 struct md_op_data *op_data)
402 struct ptlrpc_request *req;
403 struct obd_device *obddev = class_exp2obd(exp);
404 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
405 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
406 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
407 (client_is_remote(exp) ?
408 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
409 struct ldlm_intent *lit;
413 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
414 &RQF_LDLM_INTENT_GETATTR);
416 RETURN(ERR_PTR(-ENOMEM));
418 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
419 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
420 op_data->op_namelen + 1);
422 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
424 ptlrpc_request_free(req);
428 /* pack the intent */
429 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
430 lit->opc = (__u64)it->it_op;
432 /* pack the intended request */
433 mdc_getattr_pack(req, valid, it->it_flags, op_data,
434 obddev->u.cli.cl_max_mds_easize);
436 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
437 obddev->u.cli.cl_max_mds_easize);
438 if (client_is_remote(exp))
439 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
440 sizeof(struct mdt_remote_perm));
441 ptlrpc_request_set_replen(req);
445 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
447 struct ptlrpc_request *req;
451 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
453 RETURN(ERR_PTR(-ENOMEM));
455 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457 ptlrpc_request_free(req);
461 ptlrpc_request_set_replen(req);
465 static int mdc_finish_enqueue(struct obd_export *exp,
466 struct ptlrpc_request *req,
467 struct ldlm_enqueue_info *einfo,
468 struct lookup_intent *it,
469 struct lustre_handle *lockh,
472 struct req_capsule *pill = &req->rq_pill;
473 struct ldlm_request *lockreq;
474 struct ldlm_reply *lockrep;
478 /* Similarly, if we're going to replay this request, we don't want to
479 * actually get a lock, just perform the intent. */
480 if (req->rq_transno || req->rq_replay) {
481 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
482 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
485 if (rc == ELDLM_LOCK_ABORTED) {
487 memset(lockh, 0, sizeof(*lockh));
489 } else { /* rc = 0 */
490 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
493 /* If the server gave us back a different lock mode, we should
494 * fix up our variables. */
495 if (lock->l_req_mode != einfo->ei_mode) {
496 ldlm_lock_addref(lockh, lock->l_req_mode);
497 ldlm_lock_decref(lockh, einfo->ei_mode);
498 einfo->ei_mode = lock->l_req_mode;
503 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
504 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
506 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
507 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
508 it->d.lustre.it_lock_mode = einfo->ei_mode;
509 it->d.lustre.it_lock_handle = lockh->cookie;
510 it->d.lustre.it_data = req;
512 if (it->d.lustre.it_status < 0 && req->rq_replay)
513 mdc_clear_replay_flag(req, it->d.lustre.it_status);
515 /* If we're doing an IT_OPEN which did not result in an actual
516 * successful open, then we need to remove the bit which saves
517 * this request for unconditional replay.
519 * It's important that we do this first! Otherwise we might exit the
520 * function without doing so, and try to replay a failed create
522 if (it->it_op & IT_OPEN && req->rq_replay &&
523 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
524 mdc_clear_replay_flag(req, it->d.lustre.it_status);
526 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
527 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
529 /* We know what to expect, so we do any byte flipping required here */
530 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
531 struct mdt_body *body;
533 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
535 CERROR ("Can't swab mdt_body\n");
539 if (it_disposition(it, DISP_OPEN_OPEN) &&
540 !it_open_error(DISP_OPEN_OPEN, it)) {
542 * If this is a successful OPEN request, we need to set
543 * replay handler and data early, so that if replay
544 * happens immediately after swabbing below, new reply
545 * is swabbed by that handler correctly.
547 mdc_set_open_replay_data(NULL, NULL, req);
550 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
553 mdc_update_max_ea_from_body(exp, body);
556 * The eadata is opaque; just check that it is there.
557 * Eventually, obd_unpackmd() will check the contents.
559 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
565 * We save the reply LOV EA in case we have to replay a
566 * create for recovery. If we didn't allocate a large
567 * enough request buffer above we need to reallocate it
568 * here to hold the actual LOV EA.
570 * To not save LOV EA if request is not going to replay
571 * (for example error one).
573 if ((it->it_op & IT_OPEN) && req->rq_replay) {
575 if (req_capsule_get_size(pill, &RMF_EADATA,
578 mdc_realloc_openmsg(req, body);
580 req_capsule_shrink(pill, &RMF_EADATA,
584 req_capsule_set_size(pill, &RMF_EADATA,
588 lmm = req_capsule_client_get(pill, &RMF_EADATA);
590 memcpy(lmm, eadata, body->eadatasize);
594 if (body->valid & OBD_MD_FLRMTPERM) {
595 struct mdt_remote_perm *perm;
597 LASSERT(client_is_remote(exp));
598 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
599 lustre_swab_mdt_remote_perm);
603 if (body->valid & OBD_MD_FLMDSCAPA) {
604 struct lustre_capa *capa, *p;
606 capa = req_capsule_server_get(pill, &RMF_CAPA1);
610 if (it->it_op & IT_OPEN) {
611 /* client fid capa will be checked in replay */
612 p = req_capsule_client_get(pill, &RMF_CAPA2);
617 if (body->valid & OBD_MD_FLOSSCAPA) {
618 struct lustre_capa *capa;
620 capa = req_capsule_server_get(pill, &RMF_CAPA2);
629 /* We always reserve enough space in the reply packet for a stripe MD, because
630 * we don't know in advance the file type. */
631 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
632 struct lookup_intent *it, struct md_op_data *op_data,
633 struct lustre_handle *lockh, void *lmm, int lmmsize,
634 struct ptlrpc_request **reqp, int extra_lock_flags)
636 struct obd_device *obddev = class_exp2obd(exp);
637 struct ptlrpc_request *req = NULL;
638 struct req_capsule *pill;
639 int flags = extra_lock_flags;
641 struct ldlm_res_id res_id;
642 static const ldlm_policy_data_t lookup_policy =
643 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
644 static const ldlm_policy_data_t update_policy =
645 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
646 ldlm_policy_data_t const *policy = &lookup_policy;
649 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
652 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
655 flags |= LDLM_FL_HAS_INTENT;
656 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
657 policy = &update_policy;
663 /* The only way right now is FLOCK, in this case we hide flock
664 policy as lmm, but lmmsize is 0 */
665 LASSERT(lmm && lmmsize == 0);
666 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
668 policy = (ldlm_policy_data_t *)lmm;
669 res_id.name[3] = LDLM_FLOCK;
670 } else if (it->it_op & IT_OPEN) {
671 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
673 policy = &update_policy;
674 einfo->ei_cbdata = NULL;
676 } else if (it->it_op & IT_UNLINK)
677 req = mdc_intent_unlink_pack(exp, it, op_data);
678 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
679 req = mdc_intent_getattr_pack(exp, it, op_data);
680 else if (it->it_op == IT_READDIR)
681 req = ldlm_enqueue_pack(exp);
688 RETURN(PTR_ERR(req));
689 pill = &req->rq_pill;
691 /* It is important to obtain rpc_lock first (if applicable), so that
692 * threads that are serialised with rpc_lock are not polluting our
693 * rpcs in flight counter. We do not do flock request limiting, though*/
695 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
696 mdc_enter_request(&obddev->u.cli);
699 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
705 mdc_exit_request(&obddev->u.cli);
706 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
709 /* For flock requests we immediatelly return without further
710 delay and let caller deal with the rest, since rest of
711 this function metadata processing makes no sense for flock
717 CERROR("ldlm_cli_enqueue: %d\n", rc);
718 mdc_clear_replay_flag(req, rc);
719 ptlrpc_req_finished(req);
722 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
727 static int mdc_finish_intent_lock(struct obd_export *exp,
728 struct ptlrpc_request *request,
729 struct md_op_data *op_data,
730 struct lookup_intent *it,
731 struct lustre_handle *lockh)
733 struct lustre_handle old_lock;
734 struct mdt_body *mdt_body;
735 struct ldlm_lock *lock;
739 LASSERT(request != NULL);
740 LASSERT(request != LP_POISON);
741 LASSERT(request->rq_repmsg != LP_POISON);
743 if (!it_disposition(it, DISP_IT_EXECD)) {
744 /* The server failed before it even started executing the
745 * intent, i.e. because it couldn't unpack the request. */
746 LASSERT(it->d.lustre.it_status != 0);
747 RETURN(it->d.lustre.it_status);
749 rc = it_open_error(DISP_IT_EXECD, it);
753 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
754 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
756 /* If we were revalidating a fid/name pair, mark the intent in
757 * case we fail and get called again from lookup */
758 if (fid_is_sane(&op_data->op_fid2) &&
759 it->it_create_mode & M_CHECK_STALE &&
760 it->it_op != IT_GETATTR) {
761 it_set_disposition(it, DISP_ENQ_COMPLETE);
763 /* Also: did we find the same inode? */
764 /* sever can return one of two fids:
765 * op_fid2 - new allocated fid - if file is created.
766 * op_fid3 - existent fid - if file only open.
767 * op_fid3 is saved in lmv_intent_open */
768 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
769 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
770 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
771 "\n", PFID(&op_data->op_fid2),
772 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
777 rc = it_open_error(DISP_LOOKUP_EXECD, it);
781 /* keep requests around for the multiple phases of the call
782 * this shows the DISP_XX must guarantee we make it into the call
784 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
785 it_disposition(it, DISP_OPEN_CREATE) &&
786 !it_open_error(DISP_OPEN_CREATE, it)) {
787 it_set_disposition(it, DISP_ENQ_CREATE_REF);
788 ptlrpc_request_addref(request); /* balanced in ll_create_node */
790 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
791 it_disposition(it, DISP_OPEN_OPEN) &&
792 !it_open_error(DISP_OPEN_OPEN, it)) {
793 it_set_disposition(it, DISP_ENQ_OPEN_REF);
794 ptlrpc_request_addref(request); /* balanced in ll_file_open */
795 /* BUG 11546 - eviction in the middle of open rpc processing */
796 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
799 if (it->it_op & IT_CREAT) {
800 /* XXX this belongs in ll_create_it */
801 } else if (it->it_op == IT_OPEN) {
802 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
804 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
807 /* If we already have a matching lock, then cancel the new
808 * one. We have to set the data here instead of in
809 * mdc_enqueue, because we need to use the child's inode as
810 * the l_ast_data to match, and that's not available until
811 * intent_finish has performed the iget().) */
812 lock = ldlm_handle2lock(lockh);
814 ldlm_policy_data_t policy = lock->l_policy_data;
815 LDLM_DEBUG(lock, "matching against this");
817 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
818 &lock->l_resource->lr_name),
819 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
820 (unsigned long)lock->l_resource->lr_name.name[0],
821 (unsigned long)lock->l_resource->lr_name.name[1],
822 (unsigned long)lock->l_resource->lr_name.name[2],
823 (unsigned long)fid_seq(&mdt_body->fid1),
824 (unsigned long)fid_oid(&mdt_body->fid1),
825 (unsigned long)fid_ver(&mdt_body->fid1));
828 memcpy(&old_lock, lockh, sizeof(*lockh));
829 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
830 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
831 ldlm_lock_decref_and_cancel(lockh,
832 it->d.lustre.it_lock_mode);
833 memcpy(lockh, &old_lock, sizeof(old_lock));
834 it->d.lustre.it_lock_handle = lockh->cookie;
837 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
838 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
839 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
843 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
846 /* We could just return 1 immediately, but since we should only
847 * be called in revalidate_it if we already have a lock, let's
849 struct ldlm_res_id res_id;
850 struct lustre_handle lockh;
851 ldlm_policy_data_t policy;
855 fid_build_reg_res_name(fid, &res_id);
856 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
857 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
859 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
860 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
861 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
863 it->d.lustre.it_lock_handle = lockh.cookie;
864 it->d.lustre.it_lock_mode = mode;
871 * This long block is all about fixing up the lock and request state
872 * so that it is correct as of the moment _before_ the operation was
873 * applied; that way, the VFS will think that everything is normal and
874 * call Lustre's regular VFS methods.
876 * If we're performing a creation, that means that unless the creation
877 * failed with EEXIST, we should fake up a negative dentry.
879 * For everything else, we want to lookup to succeed.
881 * One additional note: if CREATE or OPEN succeeded, we add an extra
882 * reference to the request because we need to keep it around until
883 * ll_create/ll_open gets called.
885 * The server will return to us, in it_disposition, an indication of
886 * exactly what d.lustre.it_status refers to.
888 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
889 * otherwise if DISP_OPEN_CREATE is set, then it status is the
890 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
891 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
894 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
897 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
898 void *lmm, int lmmsize, struct lookup_intent *it,
899 int lookup_flags, struct ptlrpc_request **reqp,
900 ldlm_blocking_callback cb_blocking,
901 int extra_lock_flags)
903 struct lustre_handle lockh;
908 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
909 ", intent: %s flags %#o\n", op_data->op_namelen,
910 op_data->op_name, PFID(&op_data->op_fid2),
911 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
915 if (fid_is_sane(&op_data->op_fid2) &&
916 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
917 /* We could just return 1 immediately, but since we should only
918 * be called in revalidate_it if we already have a lock, let's
920 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2);
921 /* Only return failure if it was not GETATTR by cfid
922 (from inode_revalidate) */
923 if (rc || op_data->op_namelen != 0)
927 /* lookup_it may be called only after revalidate_it has run, because
928 * revalidate_it cannot return errors, only zero. Returning zero causes
929 * this call to lookup, which *can* return an error.
931 * We only want to execute the request associated with the intent one
932 * time, however, so don't send the request again. Instead, skip past
933 * this and use the request from revalidate. In this case, revalidate
934 * never dropped its reference, so the refcounts are all OK */
935 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
936 struct ldlm_enqueue_info einfo =
937 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
938 ldlm_completion_ast, NULL, NULL, NULL };
940 /* For case if upper layer did not alloc fid, do it now. */
941 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
942 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
944 CERROR("Can't alloc new fid, rc %d\n", rc);
948 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
949 lmm, lmmsize, NULL, extra_lock_flags);
952 } else if (!fid_is_sane(&op_data->op_fid2) ||
953 !(it->it_create_mode & M_CHECK_STALE)) {
954 /* DISP_ENQ_COMPLETE set means there is extra reference on
955 * request referenced from this intent, saved for subsequent
956 * lookup. This path is executed when we proceed to this
957 * lookup, so we clear DISP_ENQ_COMPLETE */
958 it_clear_disposition(it, DISP_ENQ_COMPLETE);
960 *reqp = it->d.lustre.it_data;
961 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
965 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
966 struct ptlrpc_request *req,
969 struct mdc_getattr_args *ga = args;
970 struct obd_export *exp = ga->ga_exp;
971 struct md_enqueue_info *minfo = ga->ga_minfo;
972 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
973 struct lookup_intent *it;
974 struct lustre_handle *lockh;
975 struct obd_device *obddev;
976 int flags = LDLM_FL_HAS_INTENT;
980 lockh = &minfo->mi_lockh;
982 obddev = class_exp2obd(exp);
984 mdc_exit_request(&obddev->u.cli);
985 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
988 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
989 &flags, NULL, 0, lockh, rc);
991 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
992 mdc_clear_replay_flag(req, rc);
996 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1000 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1004 OBD_FREE_PTR(einfo);
1005 minfo->mi_cb(req, minfo, rc);
1009 int mdc_intent_getattr_async(struct obd_export *exp,
1010 struct md_enqueue_info *minfo,
1011 struct ldlm_enqueue_info *einfo)
1013 struct md_op_data *op_data = &minfo->mi_data;
1014 struct lookup_intent *it = &minfo->mi_it;
1015 struct ptlrpc_request *req;
1016 struct mdc_getattr_args *ga;
1017 struct obd_device *obddev = class_exp2obd(exp);
1018 struct ldlm_res_id res_id;
1019 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1020 * for statahead currently. Consider CMD in future, such two bits
1021 * maybe managed by different MDS, should be adjusted then. */
1022 ldlm_policy_data_t policy = {
1023 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1024 MDS_INODELOCK_UPDATE }
1027 int flags = LDLM_FL_HAS_INTENT;
1030 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1031 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1032 ldlm_it2str(it->it_op), it->it_flags);
1034 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1035 req = mdc_intent_getattr_pack(exp, it, op_data);
1039 mdc_enter_request(&obddev->u.cli);
1040 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1041 0, &minfo->mi_lockh, 1);
1043 mdc_exit_request(&obddev->u.cli);
1047 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1048 ga = ptlrpc_req_async_args(req);
1050 ga->ga_minfo = minfo;
1051 ga->ga_einfo = einfo;
1053 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1054 ptlrpcd_add_req(req, PSCOPE_OTHER);