1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 int it_disposition(struct lookup_intent *it, int flag)
61 return it->d.lustre.it_disposition & flag;
63 EXPORT_SYMBOL(it_disposition);
65 void it_set_disposition(struct lookup_intent *it, int flag)
67 it->d.lustre.it_disposition |= flag;
69 EXPORT_SYMBOL(it_set_disposition);
71 void it_clear_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition &= ~flag;
75 EXPORT_SYMBOL(it_clear_disposition);
77 int it_open_error(int phase, struct lookup_intent *it)
79 if (it_disposition(it, DISP_OPEN_OPEN)) {
80 if (phase >= DISP_OPEN_OPEN)
81 return it->d.lustre.it_status;
86 if (it_disposition(it, DISP_OPEN_CREATE)) {
87 if (phase >= DISP_OPEN_CREATE)
88 return it->d.lustre.it_status;
93 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94 if (phase >= DISP_LOOKUP_EXECD)
95 return it->d.lustre.it_status;
100 if (it_disposition(it, DISP_IT_EXECD)) {
101 if (phase >= DISP_IT_EXECD)
102 return it->d.lustre.it_status;
106 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107 it->d.lustre.it_status);
111 EXPORT_SYMBOL(it_open_error);
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
117 struct ldlm_lock *lock;
128 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130 LASSERT(lock != NULL);
131 lock_res_and_lock(lock);
133 if (lock->l_ast_data && lock->l_ast_data != data) {
134 struct inode *new_inode = data;
135 struct inode *old_inode = lock->l_ast_data;
136 LASSERTF(old_inode->i_state & I_FREEING,
137 "Found existing inode %p/%lu/%u state %lu in lock: "
138 "setting data to %p/%lu/%u\n", old_inode,
139 old_inode->i_ino, old_inode->i_generation,
141 new_inode, new_inode->i_ino, new_inode->i_generation);
144 lock->l_ast_data = data;
146 *bits = lock->l_policy_data.l_inodebits.bits;
148 unlock_res_and_lock(lock);
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
159 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165 &res_id, type, policy, mode, lockh, 0);
169 int mdc_cancel_unused(struct obd_export *exp,
170 const struct lu_fid *fid,
171 ldlm_policy_data_t *policy,
173 ldlm_cancel_flags_t flags,
176 struct ldlm_res_id res_id;
177 struct obd_device *obd = class_exp2obd(exp);
182 fid_build_reg_res_name(fid, &res_id);
183 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
184 policy, mode, flags, opaque);
188 int mdc_change_cbdata(struct obd_export *exp,
189 const struct lu_fid *fid,
190 ldlm_iterator_t it, void *data)
192 struct ldlm_res_id res_id;
195 fid_build_reg_res_name(fid, &res_id);
196 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
203 /* find any ldlm lock of the inode in mdc
207 int mdc_find_cbdata(struct obd_export *exp,
208 const struct lu_fid *fid,
209 ldlm_iterator_t it, void *data)
211 struct ldlm_res_id res_id;
215 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218 if (rc == LDLM_ITER_STOP)
220 else if (rc == LDLM_ITER_CONTINUE)
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 /* Don't hold error requests for replay. */
228 if (req->rq_replay) {
229 cfs_spin_lock(&req->rq_lock);
231 cfs_spin_unlock(&req->rq_lock);
233 if (rc && req->rq_transno != 0) {
234 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239 /* Save a large LOV EA into the request buffer so that it is available
240 * for replay. We don't do this in the initial request because the
241 * original request doesn't need this buffer (at most it sends just the
242 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243 * buffer and may also be difficult to allocate and save a very large
244 * request buffer for each open. (bug 5707)
246 * OOM here may cause recovery failure if lmm is needed (only for the
247 * original open if the MDS crashed just when this client also OOM'd)
248 * but this is incredibly unlikely, and questionable whether the client
249 * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251 struct mdt_body *body)
255 /* FIXME: remove this explicit offset. */
256 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
259 CERROR("Can't enlarge segment %d size to %d\n",
260 DLM_INTENT_REC_OFF + 4, body->eadatasize);
261 body->valid &= ~OBD_MD_FLEASIZE;
262 body->eadatasize = 0;
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267 struct lookup_intent *it,
268 struct md_op_data *op_data,
269 void *lmm, int lmmsize,
272 struct ptlrpc_request *req;
273 struct obd_device *obddev = class_exp2obd(exp);
274 struct ldlm_intent *lit;
275 CFS_LIST_HEAD(cancels);
281 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283 /* XXX: openlock is not cancelled for cross-refs. */
284 /* If inode is known, cancel conflicting OPEN locks. */
285 if (fid_is_sane(&op_data->op_fid2)) {
286 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
289 else if (it->it_flags & FMODE_EXEC)
294 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
299 /* If CREATE, cancel parent's UPDATE lock. */
300 if (it->it_op & IT_CREAT)
304 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306 MDS_INODELOCK_UPDATE);
308 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309 &RQF_LDLM_INTENT_OPEN);
311 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312 RETURN(ERR_PTR(-ENOMEM));
315 /* parent capability */
316 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317 /* child capability, reserve the size according to parent capa, it will
318 * be filled after we get the reply */
319 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322 op_data->op_namelen + 1);
323 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328 ptlrpc_request_free(req);
332 cfs_spin_lock(&req->rq_lock);
333 req->rq_replay = req->rq_import->imp_replayable;
334 cfs_spin_unlock(&req->rq_lock);
336 /* pack the intent */
337 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338 lit->opc = (__u64)it->it_op;
340 /* pack the intended request */
341 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
344 /* for remote client, fetch remote perm for current user */
345 if (client_is_remote(exp))
346 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
347 sizeof(struct mdt_remote_perm));
348 ptlrpc_request_set_replen(req);
352 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
353 struct lookup_intent *it,
354 struct md_op_data *op_data)
356 struct ptlrpc_request *req;
357 struct obd_device *obddev = class_exp2obd(exp);
358 struct ldlm_intent *lit;
362 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363 &RQF_LDLM_INTENT_UNLINK);
365 RETURN(ERR_PTR(-ENOMEM));
367 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
368 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
369 op_data->op_namelen + 1);
371 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
373 ptlrpc_request_free(req);
377 /* pack the intent */
378 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379 lit->opc = (__u64)it->it_op;
381 /* pack the intended request */
382 mdc_unlink_pack(req, op_data);
384 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
385 obddev->u.cli.cl_max_mds_easize);
386 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
387 obddev->u.cli.cl_max_mds_cookiesize);
388 ptlrpc_request_set_replen(req);
392 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
393 struct lookup_intent *it,
394 struct md_op_data *op_data)
396 struct ptlrpc_request *req;
397 struct obd_device *obddev = class_exp2obd(exp);
398 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
399 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
400 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
401 (client_is_remote(exp) ?
402 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
403 struct ldlm_intent *lit;
407 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408 &RQF_LDLM_INTENT_GETATTR);
410 RETURN(ERR_PTR(-ENOMEM));
412 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
413 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414 op_data->op_namelen + 1);
416 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418 ptlrpc_request_free(req);
422 /* pack the intent */
423 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424 lit->opc = (__u64)it->it_op;
426 /* pack the intended request */
427 mdc_getattr_pack(req, valid, it->it_flags, op_data);
429 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
430 obddev->u.cli.cl_max_mds_easize);
431 if (client_is_remote(exp))
432 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
433 sizeof(struct mdt_remote_perm));
434 ptlrpc_request_set_replen(req);
438 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
440 struct ptlrpc_request *req;
444 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
446 RETURN(ERR_PTR(-ENOMEM));
448 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
450 ptlrpc_request_free(req);
454 ptlrpc_request_set_replen(req);
458 static int mdc_finish_enqueue(struct obd_export *exp,
459 struct ptlrpc_request *req,
460 struct ldlm_enqueue_info *einfo,
461 struct lookup_intent *it,
462 struct lustre_handle *lockh,
465 struct req_capsule *pill = &req->rq_pill;
466 struct ldlm_request *lockreq;
467 struct ldlm_reply *lockrep;
471 /* Similarly, if we're going to replay this request, we don't want to
472 * actually get a lock, just perform the intent. */
473 if (req->rq_transno || req->rq_replay) {
474 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
475 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
478 if (rc == ELDLM_LOCK_ABORTED) {
480 memset(lockh, 0, sizeof(*lockh));
482 } else { /* rc = 0 */
483 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
486 /* If the server gave us back a different lock mode, we should
487 * fix up our variables. */
488 if (lock->l_req_mode != einfo->ei_mode) {
489 ldlm_lock_addref(lockh, lock->l_req_mode);
490 ldlm_lock_decref(lockh, einfo->ei_mode);
491 einfo->ei_mode = lock->l_req_mode;
496 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
497 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
499 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
500 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
501 it->d.lustre.it_lock_mode = einfo->ei_mode;
502 it->d.lustre.it_lock_handle = lockh->cookie;
503 it->d.lustre.it_data = req;
505 if (it->d.lustre.it_status < 0 && req->rq_replay)
506 mdc_clear_replay_flag(req, it->d.lustre.it_status);
508 /* If we're doing an IT_OPEN which did not result in an actual
509 * successful open, then we need to remove the bit which saves
510 * this request for unconditional replay.
512 * It's important that we do this first! Otherwise we might exit the
513 * function without doing so, and try to replay a failed create
515 if (it->it_op & IT_OPEN && req->rq_replay &&
516 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
517 mdc_clear_replay_flag(req, it->d.lustre.it_status);
519 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
520 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
522 /* We know what to expect, so we do any byte flipping required here */
523 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
524 struct mdt_body *body;
526 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
528 CERROR ("Can't swab mdt_body\n");
532 if (it_disposition(it, DISP_OPEN_OPEN) &&
533 !it_open_error(DISP_OPEN_OPEN, it)) {
535 * If this is a successful OPEN request, we need to set
536 * replay handler and data early, so that if replay
537 * happens immediately after swabbing below, new reply
538 * is swabbed by that handler correctly.
540 mdc_set_open_replay_data(NULL, NULL, req);
543 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
546 mdc_update_max_ea_from_body(exp, body);
549 * The eadata is opaque; just check that it is there.
550 * Eventually, obd_unpackmd() will check the contents.
552 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
558 * We save the reply LOV EA in case we have to replay a
559 * create for recovery. If we didn't allocate a large
560 * enough request buffer above we need to reallocate it
561 * here to hold the actual LOV EA.
563 * To not save LOV EA if request is not going to replay
564 * (for example error one).
566 if ((it->it_op & IT_OPEN) && req->rq_replay) {
568 if (req_capsule_get_size(pill, &RMF_EADATA,
571 mdc_realloc_openmsg(req, body);
573 req_capsule_shrink(pill, &RMF_EADATA,
577 req_capsule_set_size(pill, &RMF_EADATA,
581 lmm = req_capsule_client_get(pill, &RMF_EADATA);
583 memcpy(lmm, eadata, body->eadatasize);
587 if (body->valid & OBD_MD_FLRMTPERM) {
588 struct mdt_remote_perm *perm;
590 LASSERT(client_is_remote(exp));
591 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
592 lustre_swab_mdt_remote_perm);
596 if (body->valid & OBD_MD_FLMDSCAPA) {
597 struct lustre_capa *capa, *p;
599 capa = req_capsule_server_get(pill, &RMF_CAPA1);
603 if (it->it_op & IT_OPEN) {
604 /* client fid capa will be checked in replay */
605 p = req_capsule_client_get(pill, &RMF_CAPA2);
610 if (body->valid & OBD_MD_FLOSSCAPA) {
611 struct lustre_capa *capa;
613 capa = req_capsule_server_get(pill, &RMF_CAPA2);
622 /* We always reserve enough space in the reply packet for a stripe MD, because
623 * we don't know in advance the file type. */
624 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
625 struct lookup_intent *it, struct md_op_data *op_data,
626 struct lustre_handle *lockh, void *lmm, int lmmsize,
627 struct ptlrpc_request **reqp, int extra_lock_flags)
629 struct obd_device *obddev = class_exp2obd(exp);
630 struct ptlrpc_request *req = NULL;
631 struct req_capsule *pill;
632 int flags = extra_lock_flags;
634 struct ldlm_res_id res_id;
635 static const ldlm_policy_data_t lookup_policy =
636 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
637 static const ldlm_policy_data_t update_policy =
638 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
639 ldlm_policy_data_t const *policy = &lookup_policy;
642 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
645 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
648 flags |= LDLM_FL_HAS_INTENT;
649 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
650 policy = &update_policy;
656 /* The only way right now is FLOCK, in this case we hide flock
657 policy as lmm, but lmmsize is 0 */
658 LASSERT(lmm && lmmsize == 0);
659 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
661 policy = (ldlm_policy_data_t *)lmm;
662 res_id.name[3] = LDLM_FLOCK;
663 } else if (it->it_op & IT_OPEN) {
664 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
666 policy = &update_policy;
667 einfo->ei_cbdata = NULL;
669 } else if (it->it_op & IT_UNLINK)
670 req = mdc_intent_unlink_pack(exp, it, op_data);
671 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
672 req = mdc_intent_getattr_pack(exp, it, op_data);
673 else if (it->it_op == IT_READDIR)
674 req = ldlm_enqueue_pack(exp);
681 RETURN(PTR_ERR(req));
682 pill = &req->rq_pill;
684 /* It is important to obtain rpc_lock first (if applicable), so that
685 * threads that are serialised with rpc_lock are not polluting our
686 * rpcs in flight counter. We do not do flock request limiting, though*/
688 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
689 mdc_enter_request(&obddev->u.cli);
692 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
698 mdc_exit_request(&obddev->u.cli);
699 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
702 /* For flock requests we immediatelly return without further
703 delay and let caller deal with the rest, since rest of
704 this function metadata processing makes no sense for flock
710 CERROR("ldlm_cli_enqueue: %d\n", rc);
711 mdc_clear_replay_flag(req, rc);
712 ptlrpc_req_finished(req);
715 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
720 static int mdc_finish_intent_lock(struct obd_export *exp,
721 struct ptlrpc_request *request,
722 struct md_op_data *op_data,
723 struct lookup_intent *it,
724 struct lustre_handle *lockh)
726 struct lustre_handle old_lock;
727 struct mdt_body *mdt_body;
728 struct ldlm_lock *lock;
732 LASSERT(request != NULL);
733 LASSERT(request != LP_POISON);
734 LASSERT(request->rq_repmsg != LP_POISON);
736 if (!it_disposition(it, DISP_IT_EXECD)) {
737 /* The server failed before it even started executing the
738 * intent, i.e. because it couldn't unpack the request. */
739 LASSERT(it->d.lustre.it_status != 0);
740 RETURN(it->d.lustre.it_status);
742 rc = it_open_error(DISP_IT_EXECD, it);
746 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
747 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
749 /* If we were revalidating a fid/name pair, mark the intent in
750 * case we fail and get called again from lookup */
751 if (fid_is_sane(&op_data->op_fid2) &&
752 it->it_create_mode & M_CHECK_STALE &&
753 it->it_op != IT_GETATTR) {
754 it_set_disposition(it, DISP_ENQ_COMPLETE);
756 /* Also: did we find the same inode? */
757 /* sever can return one of two fids:
758 * op_fid2 - new allocated fid - if file is created.
759 * op_fid3 - existent fid - if file only open.
760 * op_fid3 is saved in lmv_intent_open */
761 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
762 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
763 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
764 "\n", PFID(&op_data->op_fid2),
765 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
770 rc = it_open_error(DISP_LOOKUP_EXECD, it);
774 /* keep requests around for the multiple phases of the call
775 * this shows the DISP_XX must guarantee we make it into the call
777 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
778 it_disposition(it, DISP_OPEN_CREATE) &&
779 !it_open_error(DISP_OPEN_CREATE, it)) {
780 it_set_disposition(it, DISP_ENQ_CREATE_REF);
781 ptlrpc_request_addref(request); /* balanced in ll_create_node */
783 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
784 it_disposition(it, DISP_OPEN_OPEN) &&
785 !it_open_error(DISP_OPEN_OPEN, it)) {
786 it_set_disposition(it, DISP_ENQ_OPEN_REF);
787 ptlrpc_request_addref(request); /* balanced in ll_file_open */
788 /* BUG 11546 - eviction in the middle of open rpc processing */
789 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
792 if (it->it_op & IT_CREAT) {
793 /* XXX this belongs in ll_create_it */
794 } else if (it->it_op == IT_OPEN) {
795 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
797 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
800 /* If we already have a matching lock, then cancel the new
801 * one. We have to set the data here instead of in
802 * mdc_enqueue, because we need to use the child's inode as
803 * the l_ast_data to match, and that's not available until
804 * intent_finish has performed the iget().) */
805 lock = ldlm_handle2lock(lockh);
807 ldlm_policy_data_t policy = lock->l_policy_data;
808 LDLM_DEBUG(lock, "matching against this");
810 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
811 &lock->l_resource->lr_name),
812 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
813 (unsigned long)lock->l_resource->lr_name.name[0],
814 (unsigned long)lock->l_resource->lr_name.name[1],
815 (unsigned long)lock->l_resource->lr_name.name[2],
816 (unsigned long)fid_seq(&mdt_body->fid1),
817 (unsigned long)fid_oid(&mdt_body->fid1),
818 (unsigned long)fid_ver(&mdt_body->fid1));
821 memcpy(&old_lock, lockh, sizeof(*lockh));
822 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
823 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
824 ldlm_lock_decref_and_cancel(lockh,
825 it->d.lustre.it_lock_mode);
826 memcpy(lockh, &old_lock, sizeof(old_lock));
827 it->d.lustre.it_lock_handle = lockh->cookie;
830 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
831 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
832 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
836 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
837 struct lu_fid *fid, __u32 *bits)
839 /* We could just return 1 immediately, but since we should only
840 * be called in revalidate_it if we already have a lock, let's
842 struct ldlm_res_id res_id;
843 struct lustre_handle lockh;
844 ldlm_policy_data_t policy;
848 fid_build_reg_res_name(fid, &res_id);
849 /* Firstly consider the bits */
851 policy.l_inodebits.bits = *bits;
853 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
854 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
856 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
857 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
858 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
860 it->d.lustre.it_lock_handle = lockh.cookie;
861 it->d.lustre.it_lock_mode = mode;
863 struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
865 LASSERT(lock != NULL);
866 *bits = lock->l_policy_data.l_inodebits.bits;
875 * This long block is all about fixing up the lock and request state
876 * so that it is correct as of the moment _before_ the operation was
877 * applied; that way, the VFS will think that everything is normal and
878 * call Lustre's regular VFS methods.
880 * If we're performing a creation, that means that unless the creation
881 * failed with EEXIST, we should fake up a negative dentry.
883 * For everything else, we want to lookup to succeed.
885 * One additional note: if CREATE or OPEN succeeded, we add an extra
886 * reference to the request because we need to keep it around until
887 * ll_create/ll_open gets called.
889 * The server will return to us, in it_disposition, an indication of
890 * exactly what d.lustre.it_status refers to.
892 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
893 * otherwise if DISP_OPEN_CREATE is set, then it status is the
894 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
895 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
898 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
901 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
902 void *lmm, int lmmsize, struct lookup_intent *it,
903 int lookup_flags, struct ptlrpc_request **reqp,
904 ldlm_blocking_callback cb_blocking,
905 int extra_lock_flags)
907 struct lustre_handle lockh;
912 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
913 ", intent: %s flags %#o\n", op_data->op_namelen,
914 op_data->op_name, PFID(&op_data->op_fid2),
915 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
919 if (fid_is_sane(&op_data->op_fid2) &&
920 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
921 /* We could just return 1 immediately, but since we should only
922 * be called in revalidate_it if we already have a lock, let's
924 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
925 /* Only return failure if it was not GETATTR by cfid
926 (from inode_revalidate) */
927 if (rc || op_data->op_namelen != 0)
931 /* lookup_it may be called only after revalidate_it has run, because
932 * revalidate_it cannot return errors, only zero. Returning zero causes
933 * this call to lookup, which *can* return an error.
935 * We only want to execute the request associated with the intent one
936 * time, however, so don't send the request again. Instead, skip past
937 * this and use the request from revalidate. In this case, revalidate
938 * never dropped its reference, so the refcounts are all OK */
939 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
940 struct ldlm_enqueue_info einfo =
941 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
942 ldlm_completion_ast, NULL, NULL, NULL };
944 /* For case if upper layer did not alloc fid, do it now. */
945 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
946 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
948 CERROR("Can't alloc new fid, rc %d\n", rc);
952 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
953 lmm, lmmsize, NULL, extra_lock_flags);
956 } else if (!fid_is_sane(&op_data->op_fid2) ||
957 !(it->it_create_mode & M_CHECK_STALE)) {
958 /* DISP_ENQ_COMPLETE set means there is extra reference on
959 * request referenced from this intent, saved for subsequent
960 * lookup. This path is executed when we proceed to this
961 * lookup, so we clear DISP_ENQ_COMPLETE */
962 it_clear_disposition(it, DISP_ENQ_COMPLETE);
964 *reqp = it->d.lustre.it_data;
965 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
969 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
970 struct ptlrpc_request *req,
971 void *unused, int rc)
973 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
974 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
975 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
976 struct lookup_intent *it;
977 struct lustre_handle *lockh;
978 struct obd_device *obddev;
979 int flags = LDLM_FL_HAS_INTENT;
983 lockh = &minfo->mi_lockh;
985 obddev = class_exp2obd(exp);
987 mdc_exit_request(&obddev->u.cli);
988 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
991 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
992 &flags, NULL, 0, lockh, rc);
994 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
995 mdc_clear_replay_flag(req, rc);
999 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1003 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1007 OBD_FREE_PTR(einfo);
1008 minfo->mi_cb(req, minfo, rc);
1012 int mdc_intent_getattr_async(struct obd_export *exp,
1013 struct md_enqueue_info *minfo,
1014 struct ldlm_enqueue_info *einfo)
1016 struct md_op_data *op_data = &minfo->mi_data;
1017 struct lookup_intent *it = &minfo->mi_it;
1018 struct ptlrpc_request *req;
1019 struct obd_device *obddev = class_exp2obd(exp);
1020 struct ldlm_res_id res_id;
1021 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1022 * for statahead currently. Consider CMD in future, such two bits
1023 * maybe managed by different MDS, should be adjusted then. */
1024 ldlm_policy_data_t policy = {
1025 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1026 MDS_INODELOCK_UPDATE }
1029 int flags = LDLM_FL_HAS_INTENT;
1032 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1033 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1034 ldlm_it2str(it->it_op), it->it_flags);
1036 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1037 req = mdc_intent_getattr_pack(exp, it, op_data);
1041 mdc_enter_request(&obddev->u.cli);
1042 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1043 0, &minfo->mi_lockh, 1);
1045 mdc_exit_request(&obddev->u.cli);
1049 req->rq_async_args.pointer_arg[0] = exp;
1050 req->rq_async_args.pointer_arg[1] = minfo;
1051 req->rq_async_args.pointer_arg[2] = einfo;
1052 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1053 ptlrpcd_add_req(req, PSCOPE_OTHER);