4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
59 struct mdc_getattr_args {
60 struct obd_export *ga_exp;
61 struct md_enqueue_info *ga_minfo;
62 struct ldlm_enqueue_info *ga_einfo;
65 int it_disposition(struct lookup_intent *it, int flag)
67 return it->d.lustre.it_disposition & flag;
69 EXPORT_SYMBOL(it_disposition);
71 void it_set_disposition(struct lookup_intent *it, int flag)
73 it->d.lustre.it_disposition |= flag;
75 EXPORT_SYMBOL(it_set_disposition);
77 void it_clear_disposition(struct lookup_intent *it, int flag)
79 it->d.lustre.it_disposition &= ~flag;
81 EXPORT_SYMBOL(it_clear_disposition);
83 int it_open_error(int phase, struct lookup_intent *it)
85 if (it_disposition(it, DISP_OPEN_OPEN)) {
86 if (phase >= DISP_OPEN_OPEN)
87 return it->d.lustre.it_status;
92 if (it_disposition(it, DISP_OPEN_CREATE)) {
93 if (phase >= DISP_OPEN_CREATE)
94 return it->d.lustre.it_status;
99 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
100 if (phase >= DISP_LOOKUP_EXECD)
101 return it->d.lustre.it_status;
106 if (it_disposition(it, DISP_IT_EXECD)) {
107 if (phase >= DISP_IT_EXECD)
108 return it->d.lustre.it_status;
112 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
113 it->d.lustre.it_status);
117 EXPORT_SYMBOL(it_open_error);
119 /* this must be called on a lockh that is known to have a referenced lock */
120 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
123 struct ldlm_lock *lock;
132 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
134 LASSERT(lock != NULL);
135 lock_res_and_lock(lock);
137 if (lock->l_ast_data && lock->l_ast_data != data) {
138 struct inode *new_inode = data;
139 struct inode *old_inode = lock->l_ast_data;
140 LASSERTF(old_inode->i_state & I_FREEING,
141 "Found existing inode %p/%lu/%u state %lu in lock: "
142 "setting data to %p/%lu/%u\n", old_inode,
143 old_inode->i_ino, old_inode->i_generation,
145 new_inode, new_inode->i_ino, new_inode->i_generation);
148 lock->l_ast_data = data;
150 *bits = lock->l_policy_data.l_inodebits.bits;
152 unlock_res_and_lock(lock);
158 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
159 const struct lu_fid *fid, ldlm_type_t type,
160 ldlm_policy_data_t *policy, ldlm_mode_t mode,
161 struct lustre_handle *lockh)
163 struct ldlm_res_id res_id;
167 fid_build_reg_res_name(fid, &res_id);
168 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
169 &res_id, type, policy, mode, lockh, 0);
173 int mdc_cancel_unused(struct obd_export *exp,
174 const struct lu_fid *fid,
175 ldlm_policy_data_t *policy,
177 ldlm_cancel_flags_t flags,
180 struct ldlm_res_id res_id;
181 struct obd_device *obd = class_exp2obd(exp);
186 fid_build_reg_res_name(fid, &res_id);
187 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
188 policy, mode, flags, opaque);
192 int mdc_change_cbdata(struct obd_export *exp,
193 const struct lu_fid *fid,
194 ldlm_iterator_t it, void *data)
196 struct ldlm_res_id res_id;
199 fid_build_reg_res_name(fid, &res_id);
200 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
207 /* find any ldlm lock of the inode in mdc
211 int mdc_find_cbdata(struct obd_export *exp,
212 const struct lu_fid *fid,
213 ldlm_iterator_t it, void *data)
215 struct ldlm_res_id res_id;
219 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
220 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
222 if (rc == LDLM_ITER_STOP)
224 else if (rc == LDLM_ITER_CONTINUE)
229 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
231 /* Don't hold error requests for replay. */
232 if (req->rq_replay) {
233 cfs_spin_lock(&req->rq_lock);
235 cfs_spin_unlock(&req->rq_lock);
237 if (rc && req->rq_transno != 0) {
238 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
243 /* Save a large LOV EA into the request buffer so that it is available
244 * for replay. We don't do this in the initial request because the
245 * original request doesn't need this buffer (at most it sends just the
246 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
247 * buffer and may also be difficult to allocate and save a very large
248 * request buffer for each open. (bug 5707)
250 * OOM here may cause recovery failure if lmm is needed (only for the
251 * original open if the MDS crashed just when this client also OOM'd)
252 * but this is incredibly unlikely, and questionable whether the client
253 * could do MDS recovery under OOM anyways... */
254 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
255 struct mdt_body *body)
259 /* FIXME: remove this explicit offset. */
260 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263 CERROR("Can't enlarge segment %d size to %d\n",
264 DLM_INTENT_REC_OFF + 4, body->eadatasize);
265 body->valid &= ~OBD_MD_FLEASIZE;
266 body->eadatasize = 0;
270 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
271 struct lookup_intent *it,
272 struct md_op_data *op_data,
273 void *lmm, int lmmsize,
276 struct ptlrpc_request *req;
277 struct obd_device *obddev = class_exp2obd(exp);
278 struct ldlm_intent *lit;
279 CFS_LIST_HEAD(cancels);
285 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287 /* XXX: openlock is not cancelled for cross-refs. */
288 /* If inode is known, cancel conflicting OPEN locks. */
289 if (fid_is_sane(&op_data->op_fid2)) {
290 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
293 else if (it->it_flags & FMODE_EXEC)
298 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
303 /* If CREATE, cancel parent's UPDATE lock. */
304 if (it->it_op & IT_CREAT)
308 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
310 MDS_INODELOCK_UPDATE);
312 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
313 &RQF_LDLM_INTENT_OPEN);
315 ldlm_lock_list_put(&cancels, l_bl_ast, count);
316 RETURN(ERR_PTR(-ENOMEM));
319 /* parent capability */
320 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
321 /* child capability, reserve the size according to parent capa, it will
322 * be filled after we get the reply */
323 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
325 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
326 op_data->op_namelen + 1);
327 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
328 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
330 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
332 ptlrpc_request_free(req);
336 cfs_spin_lock(&req->rq_lock);
337 req->rq_replay = req->rq_import->imp_replayable;
338 cfs_spin_unlock(&req->rq_lock);
340 /* pack the intent */
341 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
342 lit->opc = (__u64)it->it_op;
344 /* pack the intended request */
345 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
348 /* for remote client, fetch remote perm for current user */
349 if (client_is_remote(exp))
350 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
351 sizeof(struct mdt_remote_perm));
352 ptlrpc_request_set_replen(req);
356 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
357 struct lookup_intent *it,
358 struct md_op_data *op_data)
360 struct ptlrpc_request *req;
361 struct obd_device *obddev = class_exp2obd(exp);
362 struct ldlm_intent *lit;
366 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
367 &RQF_LDLM_INTENT_UNLINK);
369 RETURN(ERR_PTR(-ENOMEM));
371 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
372 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
373 op_data->op_namelen + 1);
375 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
377 ptlrpc_request_free(req);
381 /* pack the intent */
382 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
383 lit->opc = (__u64)it->it_op;
385 /* pack the intended request */
386 mdc_unlink_pack(req, op_data);
388 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
389 obddev->u.cli.cl_max_mds_easize);
390 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
391 obddev->u.cli.cl_max_mds_cookiesize);
392 ptlrpc_request_set_replen(req);
396 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
397 struct lookup_intent *it,
398 struct md_op_data *op_data)
400 struct ptlrpc_request *req;
401 struct obd_device *obddev = class_exp2obd(exp);
402 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
403 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
404 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
405 (client_is_remote(exp) ?
406 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
407 struct ldlm_intent *lit;
411 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
412 &RQF_LDLM_INTENT_GETATTR);
414 RETURN(ERR_PTR(-ENOMEM));
416 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
417 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
418 op_data->op_namelen + 1);
420 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
422 ptlrpc_request_free(req);
426 /* pack the intent */
427 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
428 lit->opc = (__u64)it->it_op;
430 /* pack the intended request */
431 mdc_getattr_pack(req, valid, it->it_flags, op_data,
432 obddev->u.cli.cl_max_mds_easize);
434 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
435 obddev->u.cli.cl_max_mds_easize);
436 if (client_is_remote(exp))
437 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
438 sizeof(struct mdt_remote_perm));
439 ptlrpc_request_set_replen(req);
443 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
445 struct ptlrpc_request *req;
449 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
451 RETURN(ERR_PTR(-ENOMEM));
453 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
455 ptlrpc_request_free(req);
459 ptlrpc_request_set_replen(req);
463 static int mdc_finish_enqueue(struct obd_export *exp,
464 struct ptlrpc_request *req,
465 struct ldlm_enqueue_info *einfo,
466 struct lookup_intent *it,
467 struct lustre_handle *lockh,
470 struct req_capsule *pill = &req->rq_pill;
471 struct ldlm_request *lockreq;
472 struct ldlm_reply *lockrep;
476 /* Similarly, if we're going to replay this request, we don't want to
477 * actually get a lock, just perform the intent. */
478 if (req->rq_transno || req->rq_replay) {
479 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
480 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
483 if (rc == ELDLM_LOCK_ABORTED) {
485 memset(lockh, 0, sizeof(*lockh));
487 } else { /* rc = 0 */
488 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
491 /* If the server gave us back a different lock mode, we should
492 * fix up our variables. */
493 if (lock->l_req_mode != einfo->ei_mode) {
494 ldlm_lock_addref(lockh, lock->l_req_mode);
495 ldlm_lock_decref(lockh, einfo->ei_mode);
496 einfo->ei_mode = lock->l_req_mode;
501 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
502 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
504 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
505 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
506 it->d.lustre.it_lock_mode = einfo->ei_mode;
507 it->d.lustre.it_lock_handle = lockh->cookie;
508 it->d.lustre.it_data = req;
510 if (it->d.lustre.it_status < 0 && req->rq_replay)
511 mdc_clear_replay_flag(req, it->d.lustre.it_status);
513 /* If we're doing an IT_OPEN which did not result in an actual
514 * successful open, then we need to remove the bit which saves
515 * this request for unconditional replay.
517 * It's important that we do this first! Otherwise we might exit the
518 * function without doing so, and try to replay a failed create
520 if (it->it_op & IT_OPEN && req->rq_replay &&
521 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
522 mdc_clear_replay_flag(req, it->d.lustre.it_status);
524 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
525 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
527 /* We know what to expect, so we do any byte flipping required here */
528 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
529 struct mdt_body *body;
531 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
533 CERROR ("Can't swab mdt_body\n");
537 if (it_disposition(it, DISP_OPEN_OPEN) &&
538 !it_open_error(DISP_OPEN_OPEN, it)) {
540 * If this is a successful OPEN request, we need to set
541 * replay handler and data early, so that if replay
542 * happens immediately after swabbing below, new reply
543 * is swabbed by that handler correctly.
545 mdc_set_open_replay_data(NULL, NULL, req);
548 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
551 mdc_update_max_ea_from_body(exp, body);
554 * The eadata is opaque; just check that it is there.
555 * Eventually, obd_unpackmd() will check the contents.
557 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
563 * We save the reply LOV EA in case we have to replay a
564 * create for recovery. If we didn't allocate a large
565 * enough request buffer above we need to reallocate it
566 * here to hold the actual LOV EA.
568 * To not save LOV EA if request is not going to replay
569 * (for example error one).
571 if ((it->it_op & IT_OPEN) && req->rq_replay) {
573 if (req_capsule_get_size(pill, &RMF_EADATA,
576 mdc_realloc_openmsg(req, body);
578 req_capsule_shrink(pill, &RMF_EADATA,
582 req_capsule_set_size(pill, &RMF_EADATA,
586 lmm = req_capsule_client_get(pill, &RMF_EADATA);
588 memcpy(lmm, eadata, body->eadatasize);
592 if (body->valid & OBD_MD_FLRMTPERM) {
593 struct mdt_remote_perm *perm;
595 LASSERT(client_is_remote(exp));
596 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
597 lustre_swab_mdt_remote_perm);
601 if (body->valid & OBD_MD_FLMDSCAPA) {
602 struct lustre_capa *capa, *p;
604 capa = req_capsule_server_get(pill, &RMF_CAPA1);
608 if (it->it_op & IT_OPEN) {
609 /* client fid capa will be checked in replay */
610 p = req_capsule_client_get(pill, &RMF_CAPA2);
615 if (body->valid & OBD_MD_FLOSSCAPA) {
616 struct lustre_capa *capa;
618 capa = req_capsule_server_get(pill, &RMF_CAPA2);
627 /* We always reserve enough space in the reply packet for a stripe MD, because
628 * we don't know in advance the file type. */
629 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
630 struct lookup_intent *it, struct md_op_data *op_data,
631 struct lustre_handle *lockh, void *lmm, int lmmsize,
632 struct ptlrpc_request **reqp, int extra_lock_flags)
634 struct obd_device *obddev = class_exp2obd(exp);
635 struct ptlrpc_request *req = NULL;
636 int flags, saved_flags = extra_lock_flags;
638 struct ldlm_res_id res_id;
639 static const ldlm_policy_data_t lookup_policy =
640 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
641 static const ldlm_policy_data_t update_policy =
642 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
643 ldlm_policy_data_t const *policy = &lookup_policy;
644 int generation, resends = 0;
645 struct ldlm_reply *lockrep;
648 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
651 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
654 saved_flags |= LDLM_FL_HAS_INTENT;
655 if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
656 policy = &update_policy;
658 LASSERT(reqp == NULL);
660 generation = obddev->u.cli.cl_import->imp_generation;
664 /* The only way right now is FLOCK, in this case we hide flock
665 policy as lmm, but lmmsize is 0 */
666 LASSERT(lmm && lmmsize == 0);
667 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
669 policy = (ldlm_policy_data_t *)lmm;
670 res_id.name[3] = LDLM_FLOCK;
671 } else if (it->it_op & IT_OPEN) {
672 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
674 policy = &update_policy;
675 einfo->ei_cbdata = NULL;
677 } else if (it->it_op & IT_UNLINK)
678 req = mdc_intent_unlink_pack(exp, it, op_data);
679 else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
680 req = mdc_intent_getattr_pack(exp, it, op_data);
681 else if (it->it_op == IT_READDIR)
682 req = ldlm_enqueue_pack(exp);
689 RETURN(PTR_ERR(req));
692 req->rq_generation_set = 1;
693 req->rq_import_generation = generation;
694 req->rq_sent = cfs_time_current_sec() + resends;
697 /* It is important to obtain rpc_lock first (if applicable), so that
698 * threads that are serialised with rpc_lock are not polluting our
699 * rpcs in flight counter. We do not do flock request limiting, though*/
701 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
702 rc = mdc_enter_request(&obddev->u.cli);
704 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
705 mdc_clear_replay_flag(req, 0);
706 ptlrpc_req_finished(req);
711 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
714 /* For flock requests we immediatelly return without further
715 delay and let caller deal with the rest, since rest of
716 this function metadata processing makes no sense for flock
721 mdc_exit_request(&obddev->u.cli);
722 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
725 CERROR("ldlm_cli_enqueue: %d\n", rc);
726 mdc_clear_replay_flag(req, rc);
727 ptlrpc_req_finished(req);
731 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
732 LASSERT(lockrep != NULL);
734 /* Retry the create infinitely when we get -EINPROGRESS from
735 * server. This is required by the new quota design. */
736 if (it && it->it_op & IT_CREAT &&
737 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
738 mdc_clear_replay_flag(req, rc);
739 ptlrpc_req_finished(req);
742 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
743 obddev->obd_name, resends, it->it_op,
744 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
746 if (generation == obddev->u.cli.cl_import->imp_generation) {
749 CDEBUG(D_HA, "resned cross eviction\n");
754 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
759 static int mdc_finish_intent_lock(struct obd_export *exp,
760 struct ptlrpc_request *request,
761 struct md_op_data *op_data,
762 struct lookup_intent *it,
763 struct lustre_handle *lockh)
765 struct lustre_handle old_lock;
766 struct mdt_body *mdt_body;
767 struct ldlm_lock *lock;
771 LASSERT(request != NULL);
772 LASSERT(request != LP_POISON);
773 LASSERT(request->rq_repmsg != LP_POISON);
775 if (!it_disposition(it, DISP_IT_EXECD)) {
776 /* The server failed before it even started executing the
777 * intent, i.e. because it couldn't unpack the request. */
778 LASSERT(it->d.lustre.it_status != 0);
779 RETURN(it->d.lustre.it_status);
781 rc = it_open_error(DISP_IT_EXECD, it);
785 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
786 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
788 /* If we were revalidating a fid/name pair, mark the intent in
789 * case we fail and get called again from lookup */
790 if (fid_is_sane(&op_data->op_fid2) &&
791 it->it_create_mode & M_CHECK_STALE &&
792 it->it_op != IT_GETATTR) {
793 it_set_disposition(it, DISP_ENQ_COMPLETE);
795 /* Also: did we find the same inode? */
796 /* sever can return one of two fids:
797 * op_fid2 - new allocated fid - if file is created.
798 * op_fid3 - existent fid - if file only open.
799 * op_fid3 is saved in lmv_intent_open */
800 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
801 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
802 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
803 "\n", PFID(&op_data->op_fid2),
804 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
809 rc = it_open_error(DISP_LOOKUP_EXECD, it);
813 /* keep requests around for the multiple phases of the call
814 * this shows the DISP_XX must guarantee we make it into the call
816 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
817 it_disposition(it, DISP_OPEN_CREATE) &&
818 !it_open_error(DISP_OPEN_CREATE, it)) {
819 it_set_disposition(it, DISP_ENQ_CREATE_REF);
820 ptlrpc_request_addref(request); /* balanced in ll_create_node */
822 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
823 it_disposition(it, DISP_OPEN_OPEN) &&
824 !it_open_error(DISP_OPEN_OPEN, it)) {
825 it_set_disposition(it, DISP_ENQ_OPEN_REF);
826 ptlrpc_request_addref(request); /* balanced in ll_file_open */
827 /* BUG 11546 - eviction in the middle of open rpc processing */
828 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
831 if (it->it_op & IT_CREAT) {
832 /* XXX this belongs in ll_create_it */
833 } else if (it->it_op == IT_OPEN) {
834 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
836 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
839 /* If we already have a matching lock, then cancel the new
840 * one. We have to set the data here instead of in
841 * mdc_enqueue, because we need to use the child's inode as
842 * the l_ast_data to match, and that's not available until
843 * intent_finish has performed the iget().) */
844 lock = ldlm_handle2lock(lockh);
846 ldlm_policy_data_t policy = lock->l_policy_data;
847 LDLM_DEBUG(lock, "matching against this");
849 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
850 &lock->l_resource->lr_name),
851 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
852 (unsigned long)lock->l_resource->lr_name.name[0],
853 (unsigned long)lock->l_resource->lr_name.name[1],
854 (unsigned long)lock->l_resource->lr_name.name[2],
855 (unsigned long)fid_seq(&mdt_body->fid1),
856 (unsigned long)fid_oid(&mdt_body->fid1),
857 (unsigned long)fid_ver(&mdt_body->fid1));
860 memcpy(&old_lock, lockh, sizeof(*lockh));
861 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
862 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
863 ldlm_lock_decref_and_cancel(lockh,
864 it->d.lustre.it_lock_mode);
865 memcpy(lockh, &old_lock, sizeof(old_lock));
866 it->d.lustre.it_lock_handle = lockh->cookie;
869 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
870 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
871 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
875 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
876 struct lu_fid *fid, __u64 *bits)
878 /* We could just return 1 immediately, but since we should only
879 * be called in revalidate_it if we already have a lock, let's
881 struct ldlm_res_id res_id;
882 struct lustre_handle lockh;
883 ldlm_policy_data_t policy;
887 if (it->d.lustre.it_lock_handle) {
888 lockh.cookie = it->d.lustre.it_lock_handle;
889 mode = ldlm_revalidate_lock_handle(&lockh, bits);
891 fid_build_reg_res_name(fid, &res_id);
894 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
897 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
900 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
903 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
904 LDLM_FL_BLOCK_GRANTED, &res_id,
906 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
910 it->d.lustre.it_lock_handle = lockh.cookie;
911 it->d.lustre.it_lock_mode = mode;
913 it->d.lustre.it_lock_handle = 0;
914 it->d.lustre.it_lock_mode = 0;
921 * This long block is all about fixing up the lock and request state
922 * so that it is correct as of the moment _before_ the operation was
923 * applied; that way, the VFS will think that everything is normal and
924 * call Lustre's regular VFS methods.
926 * If we're performing a creation, that means that unless the creation
927 * failed with EEXIST, we should fake up a negative dentry.
929 * For everything else, we want to lookup to succeed.
931 * One additional note: if CREATE or OPEN succeeded, we add an extra
932 * reference to the request because we need to keep it around until
933 * ll_create/ll_open gets called.
935 * The server will return to us, in it_disposition, an indication of
936 * exactly what d.lustre.it_status refers to.
938 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
939 * otherwise if DISP_OPEN_CREATE is set, then it status is the
940 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
941 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
944 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
947 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
948 void *lmm, int lmmsize, struct lookup_intent *it,
949 int lookup_flags, struct ptlrpc_request **reqp,
950 ldlm_blocking_callback cb_blocking,
951 int extra_lock_flags)
953 struct lustre_handle lockh;
958 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
959 ", intent: %s flags %#o\n", op_data->op_namelen,
960 op_data->op_name, PFID(&op_data->op_fid2),
961 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
965 if (fid_is_sane(&op_data->op_fid2) &&
966 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
967 /* We could just return 1 immediately, but since we should only
968 * be called in revalidate_it if we already have a lock, let's
970 it->d.lustre.it_lock_handle = 0;
971 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
972 /* Only return failure if it was not GETATTR by cfid
973 (from inode_revalidate) */
974 if (rc || op_data->op_namelen != 0)
978 /* lookup_it may be called only after revalidate_it has run, because
979 * revalidate_it cannot return errors, only zero. Returning zero causes
980 * this call to lookup, which *can* return an error.
982 * We only want to execute the request associated with the intent one
983 * time, however, so don't send the request again. Instead, skip past
984 * this and use the request from revalidate. In this case, revalidate
985 * never dropped its reference, so the refcounts are all OK */
986 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
987 struct ldlm_enqueue_info einfo =
988 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
989 ldlm_completion_ast, NULL, NULL, NULL };
991 /* For case if upper layer did not alloc fid, do it now. */
992 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
993 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
995 CERROR("Can't alloc new fid, rc %d\n", rc);
999 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1000 lmm, lmmsize, NULL, extra_lock_flags);
1003 } else if (!fid_is_sane(&op_data->op_fid2) ||
1004 !(it->it_create_mode & M_CHECK_STALE)) {
1005 /* DISP_ENQ_COMPLETE set means there is extra reference on
1006 * request referenced from this intent, saved for subsequent
1007 * lookup. This path is executed when we proceed to this
1008 * lookup, so we clear DISP_ENQ_COMPLETE */
1009 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1011 *reqp = it->d.lustre.it_data;
1012 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1016 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1017 struct ptlrpc_request *req,
1020 struct mdc_getattr_args *ga = args;
1021 struct obd_export *exp = ga->ga_exp;
1022 struct md_enqueue_info *minfo = ga->ga_minfo;
1023 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1024 struct lookup_intent *it;
1025 struct lustre_handle *lockh;
1026 struct obd_device *obddev;
1027 int flags = LDLM_FL_HAS_INTENT;
1031 lockh = &minfo->mi_lockh;
1033 obddev = class_exp2obd(exp);
1035 mdc_exit_request(&obddev->u.cli);
1036 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1039 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1040 &flags, NULL, 0, lockh, rc);
1042 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1043 mdc_clear_replay_flag(req, rc);
1047 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1051 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1055 OBD_FREE_PTR(einfo);
1056 minfo->mi_cb(req, minfo, rc);
1060 int mdc_intent_getattr_async(struct obd_export *exp,
1061 struct md_enqueue_info *minfo,
1062 struct ldlm_enqueue_info *einfo)
1064 struct md_op_data *op_data = &minfo->mi_data;
1065 struct lookup_intent *it = &minfo->mi_it;
1066 struct ptlrpc_request *req;
1067 struct mdc_getattr_args *ga;
1068 struct obd_device *obddev = class_exp2obd(exp);
1069 struct ldlm_res_id res_id;
1070 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1071 * for statahead currently. Consider CMD in future, such two bits
1072 * maybe managed by different MDS, should be adjusted then. */
1073 ldlm_policy_data_t policy = {
1074 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1075 MDS_INODELOCK_UPDATE }
1078 int flags = LDLM_FL_HAS_INTENT;
1081 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1082 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1083 ldlm_it2str(it->it_op), it->it_flags);
1085 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1086 req = mdc_intent_getattr_pack(exp, it, op_data);
1090 rc = mdc_enter_request(&obddev->u.cli);
1092 ptlrpc_req_finished(req);
1096 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1097 0, &minfo->mi_lockh, 1);
1099 mdc_exit_request(&obddev->u.cli);
1100 ptlrpc_req_finished(req);
1104 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1105 ga = ptlrpc_req_async_args(req);
1107 ga->ga_minfo = minfo;
1108 ga->ga_einfo = einfo;
1110 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1111 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);