1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
117 struct ldlm_lock *lock;
128 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130 LASSERT(lock != NULL);
131 lock_res_and_lock(lock);
133 if (lock->l_ast_data && lock->l_ast_data != data) {
134 struct inode *new_inode = data;
135 struct inode *old_inode = lock->l_ast_data;
136 LASSERTF(old_inode->i_state & I_FREEING,
137 "Found existing inode %p/%lu/%u state %lu in lock: "
138 "setting data to %p/%lu/%u\n", old_inode,
139 old_inode->i_ino, old_inode->i_generation,
141 new_inode, new_inode->i_ino, new_inode->i_generation);
144 lock->l_ast_data = data;
146 *bits = lock->l_policy_data.l_inodebits.bits;
148 unlock_res_and_lock(lock);
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
159 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165 &res_id, type, policy, mode, lockh, 0);
169 int mdc_cancel_unused(struct obd_export *exp,
170 const struct lu_fid *fid,
171 ldlm_policy_data_t *policy,
172 ldlm_mode_t mode, int flags, void *opaque)
174 struct ldlm_res_id res_id;
175 struct obd_device *obd = class_exp2obd(exp);
180 fid_build_reg_res_name(fid, &res_id);
181 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
182 policy, mode, flags, opaque);
186 int mdc_change_cbdata(struct obd_export *exp,
187 const struct lu_fid *fid,
188 ldlm_iterator_t it, void *data)
190 struct ldlm_res_id res_id;
193 fid_build_reg_res_name(fid, &res_id);
194 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
201 /* find any ldlm lock of the inode in mdc
205 int mdc_find_cbdata(struct obd_export *exp,
206 const struct lu_fid *fid,
207 ldlm_iterator_t it, void *data)
209 struct ldlm_res_id res_id;
213 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
214 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
216 if (rc == LDLM_ITER_STOP)
218 else if (rc == LDLM_ITER_CONTINUE)
223 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
225 /* Don't hold error requests for replay. */
226 if (req->rq_replay) {
227 cfs_spin_lock(&req->rq_lock);
229 cfs_spin_unlock(&req->rq_lock);
231 if (rc && req->rq_transno != 0) {
232 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
237 /* Save a large LOV EA into the request buffer so that it is available
238 * for replay. We don't do this in the initial request because the
239 * original request doesn't need this buffer (at most it sends just the
240 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
241 * buffer and may also be difficult to allocate and save a very large
242 * request buffer for each open. (bug 5707)
244 * OOM here may cause recovery failure if lmm is needed (only for the
245 * original open if the MDS crashed just when this client also OOM'd)
246 * but this is incredibly unlikely, and questionable whether the client
247 * could do MDS recovery under OOM anyways... */
248 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
249 struct mdt_body *body)
253 /* FIXME: remove this explicit offset. */
254 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257 CERROR("Can't enlarge segment %d size to %d\n",
258 DLM_INTENT_REC_OFF + 4, body->eadatasize);
259 body->valid &= ~OBD_MD_FLEASIZE;
260 body->eadatasize = 0;
264 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
265 struct lookup_intent *it,
266 struct md_op_data *op_data,
267 void *lmm, int lmmsize,
270 struct ptlrpc_request *req;
271 struct obd_device *obddev = class_exp2obd(exp);
272 struct ldlm_intent *lit;
273 CFS_LIST_HEAD(cancels);
279 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
281 /* XXX: openlock is not cancelled for cross-refs. */
282 /* If inode is known, cancel conflicting OPEN locks. */
283 if (fid_is_sane(&op_data->op_fid2)) {
284 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
287 else if (it->it_flags & FMODE_EXEC)
292 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
297 /* If CREATE, cancel parent's UPDATE lock. */
298 if (it->it_op & IT_CREAT)
302 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
304 MDS_INODELOCK_UPDATE);
306 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
307 &RQF_LDLM_INTENT_OPEN);
309 ldlm_lock_list_put(&cancels, l_bl_ast, count);
310 RETURN(ERR_PTR(-ENOMEM));
313 /* parent capability */
314 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
315 /* child capability, reserve the size according to parent capa, it will
316 * be filled after we get the reply */
317 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
319 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
320 op_data->op_namelen + 1);
321 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
322 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
324 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
326 ptlrpc_request_free(req);
330 cfs_spin_lock(&req->rq_lock);
331 req->rq_replay = req->rq_import->imp_replayable;
332 cfs_spin_unlock(&req->rq_lock);
334 /* pack the intent */
335 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
336 lit->opc = (__u64)it->it_op;
338 /* pack the intended request */
339 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
342 /* for remote client, fetch remote perm for current user */
343 if (client_is_remote(exp))
344 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
345 sizeof(struct mdt_remote_perm));
346 ptlrpc_request_set_replen(req);
350 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
351 struct lookup_intent *it,
352 struct md_op_data *op_data)
354 struct ptlrpc_request *req;
355 struct obd_device *obddev = class_exp2obd(exp);
356 struct ldlm_intent *lit;
360 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
361 &RQF_LDLM_INTENT_UNLINK);
363 RETURN(ERR_PTR(-ENOMEM));
365 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
366 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
367 op_data->op_namelen + 1);
369 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
371 ptlrpc_request_free(req);
375 /* pack the intent */
376 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
377 lit->opc = (__u64)it->it_op;
379 /* pack the intended request */
380 mdc_unlink_pack(req, op_data);
382 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
383 obddev->u.cli.cl_max_mds_easize);
384 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
385 obddev->u.cli.cl_max_mds_cookiesize);
386 ptlrpc_request_set_replen(req);
390 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
391 struct lookup_intent *it,
392 struct md_op_data *op_data)
394 struct ptlrpc_request *req;
395 struct obd_device *obddev = class_exp2obd(exp);
396 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
397 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
398 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
399 (client_is_remote(exp) ?
400 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
401 struct ldlm_intent *lit;
405 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
406 &RQF_LDLM_INTENT_GETATTR);
408 RETURN(ERR_PTR(-ENOMEM));
410 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
411 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
412 op_data->op_namelen + 1);
414 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
416 ptlrpc_request_free(req);
420 /* pack the intent */
421 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
422 lit->opc = (__u64)it->it_op;
424 /* pack the intended request */
425 mdc_getattr_pack(req, valid, it->it_flags, op_data);
427 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
428 obddev->u.cli.cl_max_mds_easize);
429 if (client_is_remote(exp))
430 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
431 sizeof(struct mdt_remote_perm));
432 ptlrpc_request_set_replen(req);
436 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
438 struct ptlrpc_request *req;
442 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
444 RETURN(ERR_PTR(-ENOMEM));
446 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
448 ptlrpc_request_free(req);
452 ptlrpc_request_set_replen(req);
456 static int mdc_finish_enqueue(struct obd_export *exp,
457 struct ptlrpc_request *req,
458 struct ldlm_enqueue_info *einfo,
459 struct lookup_intent *it,
460 struct lustre_handle *lockh,
463 struct req_capsule *pill = &req->rq_pill;
464 struct ldlm_request *lockreq;
465 struct ldlm_reply *lockrep;
469 /* Similarly, if we're going to replay this request, we don't want to
470 * actually get a lock, just perform the intent. */
471 if (req->rq_transno || req->rq_replay) {
472 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
473 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
476 if (rc == ELDLM_LOCK_ABORTED) {
478 memset(lockh, 0, sizeof(*lockh));
480 } else { /* rc = 0 */
481 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
484 /* If the server gave us back a different lock mode, we should
485 * fix up our variables. */
486 if (lock->l_req_mode != einfo->ei_mode) {
487 ldlm_lock_addref(lockh, lock->l_req_mode);
488 ldlm_lock_decref(lockh, einfo->ei_mode);
489 einfo->ei_mode = lock->l_req_mode;
494 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
495 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
497 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
498 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
499 it->d.lustre.it_lock_mode = einfo->ei_mode;
500 it->d.lustre.it_lock_handle = lockh->cookie;
501 it->d.lustre.it_data = req;
503 if (it->d.lustre.it_status < 0 && req->rq_replay)
504 mdc_clear_replay_flag(req, it->d.lustre.it_status);
506 /* If we're doing an IT_OPEN which did not result in an actual
507 * successful open, then we need to remove the bit which saves
508 * this request for unconditional replay.
510 * It's important that we do this first! Otherwise we might exit the
511 * function without doing so, and try to replay a failed create
513 if (it->it_op & IT_OPEN && req->rq_replay &&
514 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
515 mdc_clear_replay_flag(req, it->d.lustre.it_status);
517 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
518 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
520 /* We know what to expect, so we do any byte flipping required here */
521 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
522 struct mdt_body *body;
524 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
526 CERROR ("Can't swab mdt_body\n");
530 if (it_disposition(it, DISP_OPEN_OPEN) &&
531 !it_open_error(DISP_OPEN_OPEN, it)) {
533 * If this is a successful OPEN request, we need to set
534 * replay handler and data early, so that if replay
535 * happens immediately after swabbing below, new reply
536 * is swabbed by that handler correctly.
538 mdc_set_open_replay_data(NULL, NULL, req);
541 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
544 mdc_update_max_ea_from_body(exp, body);
547 * The eadata is opaque; just check that it is there.
548 * Eventually, obd_unpackmd() will check the contents.
550 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
556 * We save the reply LOV EA in case we have to replay a
557 * create for recovery. If we didn't allocate a large
558 * enough request buffer above we need to reallocate it
559 * here to hold the actual LOV EA.
561 * To not save LOV EA if request is not going to replay
562 * (for example error one).
564 if ((it->it_op & IT_OPEN) && req->rq_replay) {
566 if (req_capsule_get_size(pill, &RMF_EADATA,
569 mdc_realloc_openmsg(req, body);
571 req_capsule_shrink(pill, &RMF_EADATA,
575 req_capsule_set_size(pill, &RMF_EADATA,
579 lmm = req_capsule_client_get(pill, &RMF_EADATA);
581 memcpy(lmm, eadata, body->eadatasize);
585 if (body->valid & OBD_MD_FLRMTPERM) {
586 struct mdt_remote_perm *perm;
588 LASSERT(client_is_remote(exp));
589 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
590 lustre_swab_mdt_remote_perm);
594 if (body->valid & OBD_MD_FLMDSCAPA) {
595 struct lustre_capa *capa, *p;
597 capa = req_capsule_server_get(pill, &RMF_CAPA1);
601 if (it->it_op & IT_OPEN) {
602 /* client fid capa will be checked in replay */
603 p = req_capsule_client_get(pill, &RMF_CAPA2);
608 if (body->valid & OBD_MD_FLOSSCAPA) {
609 struct lustre_capa *capa;
611 capa = req_capsule_server_get(pill, &RMF_CAPA2);
620 /* We always reserve enough space in the reply packet for a stripe MD, because
621 * we don't know in advance the file type. */
622 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
623 struct lookup_intent *it, struct md_op_data *op_data,
624 struct lustre_handle *lockh, void *lmm, int lmmsize,
625 struct ptlrpc_request **reqp, int extra_lock_flags)
627 struct obd_device *obddev = class_exp2obd(exp);
628 struct ptlrpc_request *req = NULL;
629 struct req_capsule *pill;
630 int flags = extra_lock_flags;
632 struct ldlm_res_id res_id;
633 static const ldlm_policy_data_t lookup_policy =
634 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
635 static const ldlm_policy_data_t update_policy =
636 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
637 ldlm_policy_data_t const *policy = &lookup_policy;
640 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
643 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
646 flags |= LDLM_FL_HAS_INTENT;
647 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
648 policy = &update_policy;
654 /* The only way right now is FLOCK, in this case we hide flock
655 policy as lmm, but lmmsize is 0 */
656 LASSERT(lmm && lmmsize == 0);
657 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
659 policy = (ldlm_policy_data_t *)lmm;
660 res_id.name[3] = LDLM_FLOCK;
661 } else if (it->it_op & IT_OPEN) {
662 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
664 policy = &update_policy;
665 einfo->ei_cbdata = NULL;
667 } else if (it->it_op & IT_UNLINK)
668 req = mdc_intent_unlink_pack(exp, it, op_data);
669 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
670 req = mdc_intent_getattr_pack(exp, it, op_data);
671 else if (it->it_op == IT_READDIR)
672 req = ldlm_enqueue_pack(exp);
679 RETURN(PTR_ERR(req));
680 pill = &req->rq_pill;
682 /* It is important to obtain rpc_lock first (if applicable), so that
683 * threads that are serialised with rpc_lock are not polluting our
684 * rpcs in flight counter. We do not do flock request limiting, though*/
686 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
687 mdc_enter_request(&obddev->u.cli);
690 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
696 mdc_exit_request(&obddev->u.cli);
697 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
700 /* For flock requests we immediatelly return without further
701 delay and let caller deal with the rest, since rest of
702 this function metadata processing makes no sense for flock
708 CERROR("ldlm_cli_enqueue: %d\n", rc);
709 mdc_clear_replay_flag(req, rc);
710 ptlrpc_req_finished(req);
713 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
718 static int mdc_finish_intent_lock(struct obd_export *exp,
719 struct ptlrpc_request *request,
720 struct md_op_data *op_data,
721 struct lookup_intent *it,
722 struct lustre_handle *lockh)
724 struct lustre_handle old_lock;
725 struct mdt_body *mdt_body;
726 struct ldlm_lock *lock;
730 LASSERT(request != NULL);
731 LASSERT(request != LP_POISON);
732 LASSERT(request->rq_repmsg != LP_POISON);
734 if (!it_disposition(it, DISP_IT_EXECD)) {
735 /* The server failed before it even started executing the
736 * intent, i.e. because it couldn't unpack the request. */
737 LASSERT(it->d.lustre.it_status != 0);
738 RETURN(it->d.lustre.it_status);
740 rc = it_open_error(DISP_IT_EXECD, it);
744 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
745 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
747 /* If we were revalidating a fid/name pair, mark the intent in
748 * case we fail and get called again from lookup */
749 if (fid_is_sane(&op_data->op_fid2) &&
750 it->it_create_mode & M_CHECK_STALE &&
751 it->it_op != IT_GETATTR) {
752 it_set_disposition(it, DISP_ENQ_COMPLETE);
754 /* Also: did we find the same inode? */
755 /* sever can return one of two fids:
756 * op_fid2 - new allocated fid - if file is created.
757 * op_fid3 - existent fid - if file only open.
758 * op_fid3 is saved in lmv_intent_open */
759 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
760 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
761 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
762 "\n", PFID(&op_data->op_fid2),
763 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
768 rc = it_open_error(DISP_LOOKUP_EXECD, it);
772 /* keep requests around for the multiple phases of the call
773 * this shows the DISP_XX must guarantee we make it into the call
775 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
776 it_disposition(it, DISP_OPEN_CREATE) &&
777 !it_open_error(DISP_OPEN_CREATE, it)) {
778 it_set_disposition(it, DISP_ENQ_CREATE_REF);
779 ptlrpc_request_addref(request); /* balanced in ll_create_node */
781 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
782 it_disposition(it, DISP_OPEN_OPEN) &&
783 !it_open_error(DISP_OPEN_OPEN, it)) {
784 it_set_disposition(it, DISP_ENQ_OPEN_REF);
785 ptlrpc_request_addref(request); /* balanced in ll_file_open */
786 /* BUG 11546 - eviction in the middle of open rpc processing */
787 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
790 if (it->it_op & IT_CREAT) {
791 /* XXX this belongs in ll_create_it */
792 } else if (it->it_op == IT_OPEN) {
793 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
795 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
798 /* If we already have a matching lock, then cancel the new
799 * one. We have to set the data here instead of in
800 * mdc_enqueue, because we need to use the child's inode as
801 * the l_ast_data to match, and that's not available until
802 * intent_finish has performed the iget().) */
803 lock = ldlm_handle2lock(lockh);
805 ldlm_policy_data_t policy = lock->l_policy_data;
806 LDLM_DEBUG(lock, "matching against this");
808 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
809 &lock->l_resource->lr_name),
810 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
811 (unsigned long)lock->l_resource->lr_name.name[0],
812 (unsigned long)lock->l_resource->lr_name.name[1],
813 (unsigned long)lock->l_resource->lr_name.name[2],
814 (unsigned long)fid_seq(&mdt_body->fid1),
815 (unsigned long)fid_oid(&mdt_body->fid1),
816 (unsigned long)fid_ver(&mdt_body->fid1));
819 memcpy(&old_lock, lockh, sizeof(*lockh));
820 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
821 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
822 ldlm_lock_decref_and_cancel(lockh,
823 it->d.lustre.it_lock_mode);
824 memcpy(lockh, &old_lock, sizeof(old_lock));
825 it->d.lustre.it_lock_handle = lockh->cookie;
828 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
829 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
830 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
834 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
835 struct lu_fid *fid, __u32 *bits)
837 /* We could just return 1 immediately, but since we should only
838 * be called in revalidate_it if we already have a lock, let's
840 struct ldlm_res_id res_id;
841 struct lustre_handle lockh;
842 ldlm_policy_data_t policy;
846 fid_build_reg_res_name(fid, &res_id);
847 /* As not all attributes are kept under update lock, e.g.
848 owner/group/acls are under lookup lock, we need both
849 ibits for GETATTR. */
850 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
851 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
853 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
854 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
855 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
857 it->d.lustre.it_lock_handle = lockh.cookie;
858 it->d.lustre.it_lock_mode = mode;
860 struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
862 LASSERT(lock != NULL);
863 *bits = lock->l_policy_data.l_inodebits.bits;
872 * This long block is all about fixing up the lock and request state
873 * so that it is correct as of the moment _before_ the operation was
874 * applied; that way, the VFS will think that everything is normal and
875 * call Lustre's regular VFS methods.
877 * If we're performing a creation, that means that unless the creation
878 * failed with EEXIST, we should fake up a negative dentry.
880 * For everything else, we want to lookup to succeed.
882 * One additional note: if CREATE or OPEN succeeded, we add an extra
883 * reference to the request because we need to keep it around until
884 * ll_create/ll_open gets called.
886 * The server will return to us, in it_disposition, an indication of
887 * exactly what d.lustre.it_status refers to.
889 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
890 * otherwise if DISP_OPEN_CREATE is set, then it status is the
891 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
892 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
895 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
898 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
899 void *lmm, int lmmsize, struct lookup_intent *it,
900 int lookup_flags, struct ptlrpc_request **reqp,
901 ldlm_blocking_callback cb_blocking,
902 int extra_lock_flags)
904 struct lustre_handle lockh;
909 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
910 ", intent: %s flags %#o\n", op_data->op_namelen,
911 op_data->op_name, PFID(&op_data->op_fid2),
912 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
916 if (fid_is_sane(&op_data->op_fid2) &&
917 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
918 /* We could just return 1 immediately, but since we should only
919 * be called in revalidate_it if we already have a lock, let's
921 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
922 /* Only return failure if it was not GETATTR by cfid
923 (from inode_revalidate) */
924 if (rc || op_data->op_namelen != 0)
928 /* lookup_it may be called only after revalidate_it has run, because
929 * revalidate_it cannot return errors, only zero. Returning zero causes
930 * this call to lookup, which *can* return an error.
932 * We only want to execute the request associated with the intent one
933 * time, however, so don't send the request again. Instead, skip past
934 * this and use the request from revalidate. In this case, revalidate
935 * never dropped its reference, so the refcounts are all OK */
936 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
937 struct ldlm_enqueue_info einfo =
938 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
939 ldlm_completion_ast, NULL, NULL, NULL };
941 /* For case if upper layer did not alloc fid, do it now. */
942 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
943 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
945 CERROR("Can't alloc new fid, rc %d\n", rc);
949 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
950 lmm, lmmsize, NULL, extra_lock_flags);
953 } else if (!fid_is_sane(&op_data->op_fid2) ||
954 !(it->it_create_mode & M_CHECK_STALE)) {
955 /* DISP_ENQ_COMPLETE set means there is extra reference on
956 * request referenced from this intent, saved for subsequent
957 * lookup. This path is executed when we proceed to this
958 * lookup, so we clear DISP_ENQ_COMPLETE */
959 it_clear_disposition(it, DISP_ENQ_COMPLETE);
961 *reqp = it->d.lustre.it_data;
962 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
966 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
967 struct ptlrpc_request *req,
968 void *unused, int rc)
970 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
971 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
972 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
973 struct lookup_intent *it;
974 struct lustre_handle *lockh;
975 struct obd_device *obddev;
976 int flags = LDLM_FL_HAS_INTENT;
980 lockh = &minfo->mi_lockh;
982 obddev = class_exp2obd(exp);
984 mdc_exit_request(&obddev->u.cli);
985 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
988 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
989 &flags, NULL, 0, lockh, rc);
991 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
992 mdc_clear_replay_flag(req, rc);
996 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1000 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1004 OBD_FREE_PTR(einfo);
1005 minfo->mi_cb(req, minfo, rc);
1009 int mdc_intent_getattr_async(struct obd_export *exp,
1010 struct md_enqueue_info *minfo,
1011 struct ldlm_enqueue_info *einfo)
1013 struct md_op_data *op_data = &minfo->mi_data;
1014 struct lookup_intent *it = &minfo->mi_it;
1015 struct ptlrpc_request *req;
1016 struct obd_device *obddev = class_exp2obd(exp);
1017 struct ldlm_res_id res_id;
1018 ldlm_policy_data_t policy = {
1019 .l_inodebits = { MDS_INODELOCK_LOOKUP }
1022 int flags = LDLM_FL_HAS_INTENT;
1025 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1026 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1027 ldlm_it2str(it->it_op), it->it_flags);
1029 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1030 req = mdc_intent_getattr_pack(exp, it, op_data);
1034 mdc_enter_request(&obddev->u.cli);
1035 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1036 0, &minfo->mi_lockh, 1);
1038 mdc_exit_request(&obddev->u.cli);
1042 req->rq_async_args.pointer_arg[0] = exp;
1043 req->rq_async_args.pointer_arg[1] = minfo;
1044 req->rq_async_args.pointer_arg[2] = einfo;
1045 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1046 ptlrpcd_add_req(req, PSCOPE_OTHER);