4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
129 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 LASSERT(lock != NULL);
132 lock_res_and_lock(lock);
134 if (lock->l_ast_data && lock->l_ast_data != data) {
135 struct inode *new_inode = data;
136 struct inode *old_inode = lock->l_ast_data;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
145 lock->l_ast_data = data;
147 *bits = lock->l_policy_data.l_inodebits.bits;
149 unlock_res_and_lock(lock);
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
156 const struct lu_fid *fid, ldlm_type_t type,
157 ldlm_policy_data_t *policy, ldlm_mode_t mode,
158 struct lustre_handle *lockh)
160 struct ldlm_res_id res_id;
164 fid_build_reg_res_name(fid, &res_id);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
183 fid_build_reg_res_name(fid, &res_id);
184 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185 policy, mode, flags, opaque);
189 int mdc_change_cbdata(struct obd_export *exp,
190 const struct lu_fid *fid,
191 ldlm_iterator_t it, void *data)
193 struct ldlm_res_id res_id;
196 fid_build_reg_res_name(fid, &res_id);
197 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
204 /* find any ldlm lock of the inode in mdc
208 int mdc_find_cbdata(struct obd_export *exp,
209 const struct lu_fid *fid,
210 ldlm_iterator_t it, void *data)
212 struct ldlm_res_id res_id;
216 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
219 if (rc == LDLM_ITER_STOP)
221 else if (rc == LDLM_ITER_CONTINUE)
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
228 /* Don't hold error requests for replay. */
229 if (req->rq_replay) {
230 spin_lock(&req->rq_lock);
232 spin_unlock(&req->rq_lock);
234 if (rc && req->rq_transno != 0) {
235 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240 /* Save a large LOV EA into the request buffer so that it is available
241 * for replay. We don't do this in the initial request because the
242 * original request doesn't need this buffer (at most it sends just the
243 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244 * buffer and may also be difficult to allocate and save a very large
245 * request buffer for each open. (bug 5707)
247 * OOM here may cause recovery failure if lmm is needed (only for the
248 * original open if the MDS crashed just when this client also OOM'd)
249 * but this is incredibly unlikely, and questionable whether the client
250 * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252 struct mdt_body *body)
256 /* FIXME: remove this explicit offset. */
257 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
260 CERROR("Can't enlarge segment %d size to %d\n",
261 DLM_INTENT_REC_OFF + 4, body->eadatasize);
262 body->valid &= ~OBD_MD_FLEASIZE;
263 body->eadatasize = 0;
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268 struct lookup_intent *it,
269 struct md_op_data *op_data,
270 void *lmm, int lmmsize,
273 struct ptlrpc_request *req;
274 struct obd_device *obddev = class_exp2obd(exp);
275 struct ldlm_intent *lit;
276 CFS_LIST_HEAD(cancels);
282 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284 /* XXX: openlock is not cancelled for cross-refs. */
285 /* If inode is known, cancel conflicting OPEN locks. */
286 if (fid_is_sane(&op_data->op_fid2)) {
287 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290 else if (it->it_flags & FMODE_EXEC)
295 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300 /* If CREATE, cancel parent's UPDATE lock. */
301 if (it->it_op & IT_CREAT)
305 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
307 MDS_INODELOCK_UPDATE);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310 &RQF_LDLM_INTENT_OPEN);
312 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313 RETURN(ERR_PTR(-ENOMEM));
316 /* parent capability */
317 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318 /* child capability, reserve the size according to parent capa, it will
319 * be filled after we get the reply */
320 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
322 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323 op_data->op_namelen + 1);
324 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
327 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
329 ptlrpc_request_free(req);
333 spin_lock(&req->rq_lock);
334 req->rq_replay = req->rq_import->imp_replayable;
335 spin_unlock(&req->rq_lock);
337 /* pack the intent */
338 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 lit->opc = (__u64)it->it_op;
341 /* pack the intended request */
342 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
345 /* for remote client, fetch remote perm for current user */
346 if (client_is_remote(exp))
347 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348 sizeof(struct mdt_remote_perm));
349 ptlrpc_request_set_replen(req);
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354 struct lookup_intent *it,
355 struct md_op_data *op_data)
357 struct ptlrpc_request *req;
358 struct obd_device *obddev = class_exp2obd(exp);
359 struct ldlm_intent *lit;
363 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364 &RQF_LDLM_INTENT_UNLINK);
366 RETURN(ERR_PTR(-ENOMEM));
368 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370 op_data->op_namelen + 1);
372 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
374 ptlrpc_request_free(req);
378 /* pack the intent */
379 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380 lit->opc = (__u64)it->it_op;
382 /* pack the intended request */
383 mdc_unlink_pack(req, op_data);
385 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386 obddev->u.cli.cl_max_mds_easize);
387 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388 obddev->u.cli.cl_max_mds_cookiesize);
389 ptlrpc_request_set_replen(req);
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394 struct lookup_intent *it,
395 struct md_op_data *op_data)
397 struct ptlrpc_request *req;
398 struct obd_device *obddev = class_exp2obd(exp);
399 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402 (client_is_remote(exp) ?
403 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404 struct ldlm_intent *lit;
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
411 RETURN(ERR_PTR(-ENOMEM));
413 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415 op_data->op_namelen + 1);
417 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 ptlrpc_request_free(req);
423 /* pack the intent */
424 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425 lit->opc = (__u64)it->it_op;
427 /* pack the intended request */
428 mdc_getattr_pack(req, valid, it->it_flags, op_data,
429 obddev->u.cli.cl_max_mds_easize);
431 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 obddev->u.cli.cl_max_mds_easize);
433 if (client_is_remote(exp))
434 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435 sizeof(struct mdt_remote_perm));
436 ptlrpc_request_set_replen(req);
440 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
441 struct lookup_intent *it,
442 struct md_op_data *unused)
444 struct obd_device *obd = class_exp2obd(exp);
445 struct ptlrpc_request *req;
446 struct ldlm_intent *lit;
447 struct layout_intent *layout;
451 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
452 &RQF_LDLM_INTENT_LAYOUT);
454 RETURN(ERR_PTR(-ENOMEM));
456 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
457 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
459 ptlrpc_request_free(req);
463 /* pack the intent */
464 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
465 lit->opc = (__u64)it->it_op;
467 /* pack the layout intent request */
468 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
469 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
470 * set for replication */
471 layout->li_opc = LAYOUT_INTENT_ACCESS;
473 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
474 obd->u.cli.cl_max_mds_easize);
475 ptlrpc_request_set_replen(req);
479 static struct ptlrpc_request *
480 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
482 struct ptlrpc_request *req;
486 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
488 RETURN(ERR_PTR(-ENOMEM));
490 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
492 ptlrpc_request_free(req);
496 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
497 ptlrpc_request_set_replen(req);
501 static int mdc_finish_enqueue(struct obd_export *exp,
502 struct ptlrpc_request *req,
503 struct ldlm_enqueue_info *einfo,
504 struct lookup_intent *it,
505 struct lustre_handle *lockh,
508 struct req_capsule *pill = &req->rq_pill;
509 struct ldlm_request *lockreq;
510 struct ldlm_reply *lockrep;
511 struct lustre_intent_data *intent = &it->d.lustre;
512 struct ldlm_lock *lock;
513 void *lvb_data = NULL;
518 /* Similarly, if we're going to replay this request, we don't want to
519 * actually get a lock, just perform the intent. */
520 if (req->rq_transno || req->rq_replay) {
521 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
522 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
525 if (rc == ELDLM_LOCK_ABORTED) {
527 memset(lockh, 0, sizeof(*lockh));
529 } else { /* rc = 0 */
530 lock = ldlm_handle2lock(lockh);
531 LASSERT(lock != NULL);
533 /* If the server gave us back a different lock mode, we should
534 * fix up our variables. */
535 if (lock->l_req_mode != einfo->ei_mode) {
536 ldlm_lock_addref(lockh, lock->l_req_mode);
537 ldlm_lock_decref(lockh, einfo->ei_mode);
538 einfo->ei_mode = lock->l_req_mode;
543 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
544 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
546 intent->it_disposition = (int)lockrep->lock_policy_res1;
547 intent->it_status = (int)lockrep->lock_policy_res2;
548 intent->it_lock_mode = einfo->ei_mode;
549 intent->it_lock_handle = lockh->cookie;
550 intent->it_data = req;
552 /* Technically speaking rq_transno must already be zero if
553 * it_status is in error, so the check is a bit redundant */
554 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
555 mdc_clear_replay_flag(req, intent->it_status);
557 /* If we're doing an IT_OPEN which did not result in an actual
558 * successful open, then we need to remove the bit which saves
559 * this request for unconditional replay.
561 * It's important that we do this first! Otherwise we might exit the
562 * function without doing so, and try to replay a failed create
564 if (it->it_op & IT_OPEN && req->rq_replay &&
565 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
566 mdc_clear_replay_flag(req, intent->it_status);
568 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
569 it->it_op, intent->it_disposition, intent->it_status);
571 /* We know what to expect, so we do any byte flipping required here */
572 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
573 struct mdt_body *body;
575 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
577 CERROR ("Can't swab mdt_body\n");
581 if (it_disposition(it, DISP_OPEN_OPEN) &&
582 !it_open_error(DISP_OPEN_OPEN, it)) {
584 * If this is a successful OPEN request, we need to set
585 * replay handler and data early, so that if replay
586 * happens immediately after swabbing below, new reply
587 * is swabbed by that handler correctly.
589 mdc_set_open_replay_data(NULL, NULL, req);
592 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
595 mdc_update_max_ea_from_body(exp, body);
598 * The eadata is opaque; just check that it is there.
599 * Eventually, obd_unpackmd() will check the contents.
601 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
606 /* save lvb data and length in case this is for layout
609 lvb_len = body->eadatasize;
612 * We save the reply LOV EA in case we have to replay a
613 * create for recovery. If we didn't allocate a large
614 * enough request buffer above we need to reallocate it
615 * here to hold the actual LOV EA.
617 * To not save LOV EA if request is not going to replay
618 * (for example error one).
620 if ((it->it_op & IT_OPEN) && req->rq_replay) {
622 if (req_capsule_get_size(pill, &RMF_EADATA,
625 mdc_realloc_openmsg(req, body);
627 req_capsule_shrink(pill, &RMF_EADATA,
631 req_capsule_set_size(pill, &RMF_EADATA,
635 lmm = req_capsule_client_get(pill, &RMF_EADATA);
637 memcpy(lmm, eadata, body->eadatasize);
641 if (body->valid & OBD_MD_FLRMTPERM) {
642 struct mdt_remote_perm *perm;
644 LASSERT(client_is_remote(exp));
645 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
646 lustre_swab_mdt_remote_perm);
650 if (body->valid & OBD_MD_FLMDSCAPA) {
651 struct lustre_capa *capa, *p;
653 capa = req_capsule_server_get(pill, &RMF_CAPA1);
657 if (it->it_op & IT_OPEN) {
658 /* client fid capa will be checked in replay */
659 p = req_capsule_client_get(pill, &RMF_CAPA2);
664 if (body->valid & OBD_MD_FLOSSCAPA) {
665 struct lustre_capa *capa;
667 capa = req_capsule_server_get(pill, &RMF_CAPA2);
671 } else if (it->it_op & IT_LAYOUT) {
672 /* maybe the lock was granted right away and layout
673 * is packed into RMF_DLM_LVB of req */
674 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
676 lvb_data = req_capsule_server_sized_get(pill,
677 &RMF_DLM_LVB, lvb_len);
678 if (lvb_data == NULL)
683 /* fill in stripe data for layout lock */
684 lock = ldlm_handle2lock(lockh);
685 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
688 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
689 ldlm_it2str(it->it_op), lvb_len);
691 OBD_ALLOC_LARGE(lmm, lvb_len);
696 memcpy(lmm, lvb_data, lvb_len);
698 /* install lvb_data */
699 lock_res_and_lock(lock);
700 if (lock->l_lvb_data == NULL) {
701 lock->l_lvb_data = lmm;
702 lock->l_lvb_len = lvb_len;
705 unlock_res_and_lock(lock);
707 OBD_FREE_LARGE(lmm, lvb_len);
715 /* We always reserve enough space in the reply packet for a stripe MD, because
716 * we don't know in advance the file type. */
717 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
718 struct lookup_intent *it, struct md_op_data *op_data,
719 struct lustre_handle *lockh, void *lmm, int lmmsize,
720 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
722 struct obd_device *obddev = class_exp2obd(exp);
723 struct ptlrpc_request *req = NULL;
724 __u64 flags, saved_flags = extra_lock_flags;
726 struct ldlm_res_id res_id;
727 static const ldlm_policy_data_t lookup_policy =
728 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
729 static const ldlm_policy_data_t update_policy =
730 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
731 static const ldlm_policy_data_t layout_policy =
732 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
733 ldlm_policy_data_t const *policy = &lookup_policy;
734 int generation, resends = 0;
735 struct ldlm_reply *lockrep;
736 enum lvb_type lvb_type = 0;
739 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
742 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
745 saved_flags |= LDLM_FL_HAS_INTENT;
746 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
747 policy = &update_policy;
748 else if (it->it_op & IT_LAYOUT)
749 policy = &layout_policy;
752 LASSERT(reqp == NULL);
754 generation = obddev->u.cli.cl_import->imp_generation;
758 /* The only way right now is FLOCK, in this case we hide flock
759 policy as lmm, but lmmsize is 0 */
760 LASSERT(lmm && lmmsize == 0);
761 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
763 policy = (ldlm_policy_data_t *)lmm;
764 res_id.name[3] = LDLM_FLOCK;
765 } else if (it->it_op & IT_OPEN) {
766 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
768 policy = &update_policy;
769 einfo->ei_cbdata = NULL;
771 } else if (it->it_op & IT_UNLINK) {
772 req = mdc_intent_unlink_pack(exp, it, op_data);
773 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
774 req = mdc_intent_getattr_pack(exp, it, op_data);
775 } else if (it->it_op & IT_READDIR) {
776 req = mdc_enqueue_pack(exp, 0);
777 } else if (it->it_op & IT_LAYOUT) {
778 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
781 req = mdc_intent_layout_pack(exp, it, op_data);
782 lvb_type = LVB_T_LAYOUT;
789 RETURN(PTR_ERR(req));
791 if (req != NULL && it && it->it_op & IT_CREAT)
792 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
794 req->rq_no_retry_einprogress = 1;
797 req->rq_generation_set = 1;
798 req->rq_import_generation = generation;
799 req->rq_sent = cfs_time_current_sec() + resends;
802 /* It is important to obtain rpc_lock first (if applicable), so that
803 * threads that are serialised with rpc_lock are not polluting our
804 * rpcs in flight counter. We do not do flock request limiting, though*/
806 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
807 rc = mdc_enter_request(&obddev->u.cli);
809 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
810 mdc_clear_replay_flag(req, 0);
811 ptlrpc_req_finished(req);
816 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
817 0, lvb_type, lockh, 0);
819 /* For flock requests we immediatelly return without further
820 delay and let caller deal with the rest, since rest of
821 this function metadata processing makes no sense for flock
826 mdc_exit_request(&obddev->u.cli);
827 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
830 CERROR("ldlm_cli_enqueue: %d\n", rc);
831 mdc_clear_replay_flag(req, rc);
832 ptlrpc_req_finished(req);
836 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
837 LASSERT(lockrep != NULL);
839 /* Retry the create infinitely when we get -EINPROGRESS from
840 * server. This is required by the new quota design. */
841 if (it && it->it_op & IT_CREAT &&
842 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
843 mdc_clear_replay_flag(req, rc);
844 ptlrpc_req_finished(req);
847 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
848 obddev->obd_name, resends, it->it_op,
849 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
851 if (generation == obddev->u.cli.cl_import->imp_generation) {
854 CDEBUG(D_HA, "resend cross eviction\n");
859 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
861 if (lustre_handle_is_used(lockh)) {
862 ldlm_lock_decref(lockh, einfo->ei_mode);
863 memset(lockh, 0, sizeof(*lockh));
865 ptlrpc_req_finished(req);
870 static int mdc_finish_intent_lock(struct obd_export *exp,
871 struct ptlrpc_request *request,
872 struct md_op_data *op_data,
873 struct lookup_intent *it,
874 struct lustre_handle *lockh)
876 struct lustre_handle old_lock;
877 struct mdt_body *mdt_body;
878 struct ldlm_lock *lock;
882 LASSERT(request != NULL);
883 LASSERT(request != LP_POISON);
884 LASSERT(request->rq_repmsg != LP_POISON);
886 if (!it_disposition(it, DISP_IT_EXECD)) {
887 /* The server failed before it even started executing the
888 * intent, i.e. because it couldn't unpack the request. */
889 LASSERT(it->d.lustre.it_status != 0);
890 RETURN(it->d.lustre.it_status);
892 rc = it_open_error(DISP_IT_EXECD, it);
896 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
897 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
899 /* If we were revalidating a fid/name pair, mark the intent in
900 * case we fail and get called again from lookup */
901 if (fid_is_sane(&op_data->op_fid2) &&
902 it->it_create_mode & M_CHECK_STALE &&
903 it->it_op != IT_GETATTR) {
904 it_set_disposition(it, DISP_ENQ_COMPLETE);
906 /* Also: did we find the same inode? */
907 /* sever can return one of two fids:
908 * op_fid2 - new allocated fid - if file is created.
909 * op_fid3 - existent fid - if file only open.
910 * op_fid3 is saved in lmv_intent_open */
911 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
912 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
913 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
914 "\n", PFID(&op_data->op_fid2),
915 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
920 rc = it_open_error(DISP_LOOKUP_EXECD, it);
924 /* keep requests around for the multiple phases of the call
925 * this shows the DISP_XX must guarantee we make it into the call
927 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
928 it_disposition(it, DISP_OPEN_CREATE) &&
929 !it_open_error(DISP_OPEN_CREATE, it)) {
930 it_set_disposition(it, DISP_ENQ_CREATE_REF);
931 ptlrpc_request_addref(request); /* balanced in ll_create_node */
933 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
934 it_disposition(it, DISP_OPEN_OPEN) &&
935 !it_open_error(DISP_OPEN_OPEN, it)) {
936 it_set_disposition(it, DISP_ENQ_OPEN_REF);
937 ptlrpc_request_addref(request); /* balanced in ll_file_open */
938 /* BUG 11546 - eviction in the middle of open rpc processing */
939 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
942 if (it->it_op & IT_CREAT) {
943 /* XXX this belongs in ll_create_it */
944 } else if (it->it_op == IT_OPEN) {
945 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
947 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
950 /* If we already have a matching lock, then cancel the new
951 * one. We have to set the data here instead of in
952 * mdc_enqueue, because we need to use the child's inode as
953 * the l_ast_data to match, and that's not available until
954 * intent_finish has performed the iget().) */
955 lock = ldlm_handle2lock(lockh);
957 ldlm_policy_data_t policy = lock->l_policy_data;
958 LDLM_DEBUG(lock, "matching against this");
960 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
961 &lock->l_resource->lr_name),
962 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
963 (unsigned long)lock->l_resource->lr_name.name[0],
964 (unsigned long)lock->l_resource->lr_name.name[1],
965 (unsigned long)lock->l_resource->lr_name.name[2],
966 (unsigned long)fid_seq(&mdt_body->fid1),
967 (unsigned long)fid_oid(&mdt_body->fid1),
968 (unsigned long)fid_ver(&mdt_body->fid1));
971 memcpy(&old_lock, lockh, sizeof(*lockh));
972 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
973 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
974 ldlm_lock_decref_and_cancel(lockh,
975 it->d.lustre.it_lock_mode);
976 memcpy(lockh, &old_lock, sizeof(old_lock));
977 it->d.lustre.it_lock_handle = lockh->cookie;
980 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
981 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
982 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
986 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
987 struct lu_fid *fid, __u64 *bits)
989 /* We could just return 1 immediately, but since we should only
990 * be called in revalidate_it if we already have a lock, let's
992 struct ldlm_res_id res_id;
993 struct lustre_handle lockh;
994 ldlm_policy_data_t policy;
998 if (it->d.lustre.it_lock_handle) {
999 lockh.cookie = it->d.lustre.it_lock_handle;
1000 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1002 fid_build_reg_res_name(fid, &res_id);
1003 switch (it->it_op) {
1005 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1008 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1011 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1014 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1015 LDLM_FL_BLOCK_GRANTED, &res_id,
1016 LDLM_IBITS, &policy,
1017 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1021 it->d.lustre.it_lock_handle = lockh.cookie;
1022 it->d.lustre.it_lock_mode = mode;
1024 it->d.lustre.it_lock_handle = 0;
1025 it->d.lustre.it_lock_mode = 0;
1032 * This long block is all about fixing up the lock and request state
1033 * so that it is correct as of the moment _before_ the operation was
1034 * applied; that way, the VFS will think that everything is normal and
1035 * call Lustre's regular VFS methods.
1037 * If we're performing a creation, that means that unless the creation
1038 * failed with EEXIST, we should fake up a negative dentry.
1040 * For everything else, we want to lookup to succeed.
1042 * One additional note: if CREATE or OPEN succeeded, we add an extra
1043 * reference to the request because we need to keep it around until
1044 * ll_create/ll_open gets called.
1046 * The server will return to us, in it_disposition, an indication of
1047 * exactly what d.lustre.it_status refers to.
1049 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1050 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1051 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1052 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1055 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1058 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1059 void *lmm, int lmmsize, struct lookup_intent *it,
1060 int lookup_flags, struct ptlrpc_request **reqp,
1061 ldlm_blocking_callback cb_blocking,
1062 __u64 extra_lock_flags)
1064 struct lustre_handle lockh;
1069 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1070 ", intent: %s flags %#o\n", op_data->op_namelen,
1071 op_data->op_name, PFID(&op_data->op_fid2),
1072 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1076 if (fid_is_sane(&op_data->op_fid2) &&
1077 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1078 /* We could just return 1 immediately, but since we should only
1079 * be called in revalidate_it if we already have a lock, let's
1081 it->d.lustre.it_lock_handle = 0;
1082 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1083 /* Only return failure if it was not GETATTR by cfid
1084 (from inode_revalidate) */
1085 if (rc || op_data->op_namelen != 0)
1089 /* lookup_it may be called only after revalidate_it has run, because
1090 * revalidate_it cannot return errors, only zero. Returning zero causes
1091 * this call to lookup, which *can* return an error.
1093 * We only want to execute the request associated with the intent one
1094 * time, however, so don't send the request again. Instead, skip past
1095 * this and use the request from revalidate. In this case, revalidate
1096 * never dropped its reference, so the refcounts are all OK */
1097 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1098 struct ldlm_enqueue_info einfo =
1099 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1100 ldlm_completion_ast, NULL, NULL, NULL };
1102 /* For case if upper layer did not alloc fid, do it now. */
1103 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1104 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1106 CERROR("Can't alloc new fid, rc %d\n", rc);
1110 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1111 lmm, lmmsize, NULL, extra_lock_flags);
1114 } else if (!fid_is_sane(&op_data->op_fid2) ||
1115 !(it->it_create_mode & M_CHECK_STALE)) {
1116 /* DISP_ENQ_COMPLETE set means there is extra reference on
1117 * request referenced from this intent, saved for subsequent
1118 * lookup. This path is executed when we proceed to this
1119 * lookup, so we clear DISP_ENQ_COMPLETE */
1120 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1122 *reqp = it->d.lustre.it_data;
1123 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1127 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1128 struct ptlrpc_request *req,
1131 struct mdc_getattr_args *ga = args;
1132 struct obd_export *exp = ga->ga_exp;
1133 struct md_enqueue_info *minfo = ga->ga_minfo;
1134 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1135 struct lookup_intent *it;
1136 struct lustre_handle *lockh;
1137 struct obd_device *obddev;
1138 __u64 flags = LDLM_FL_HAS_INTENT;
1142 lockh = &minfo->mi_lockh;
1144 obddev = class_exp2obd(exp);
1146 mdc_exit_request(&obddev->u.cli);
1147 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1150 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1151 &flags, NULL, 0, lockh, rc);
1153 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1154 mdc_clear_replay_flag(req, rc);
1158 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1162 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1166 OBD_FREE_PTR(einfo);
1167 minfo->mi_cb(req, minfo, rc);
1171 int mdc_intent_getattr_async(struct obd_export *exp,
1172 struct md_enqueue_info *minfo,
1173 struct ldlm_enqueue_info *einfo)
1175 struct md_op_data *op_data = &minfo->mi_data;
1176 struct lookup_intent *it = &minfo->mi_it;
1177 struct ptlrpc_request *req;
1178 struct mdc_getattr_args *ga;
1179 struct obd_device *obddev = class_exp2obd(exp);
1180 struct ldlm_res_id res_id;
1181 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1182 * for statahead currently. Consider CMD in future, such two bits
1183 * maybe managed by different MDS, should be adjusted then. */
1184 ldlm_policy_data_t policy = {
1185 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1186 MDS_INODELOCK_UPDATE }
1189 __u64 flags = LDLM_FL_HAS_INTENT;
1192 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1193 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1194 ldlm_it2str(it->it_op), it->it_flags);
1196 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1197 req = mdc_intent_getattr_pack(exp, it, op_data);
1201 rc = mdc_enter_request(&obddev->u.cli);
1203 ptlrpc_req_finished(req);
1207 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1208 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1210 mdc_exit_request(&obddev->u.cli);
1211 ptlrpc_req_finished(req);
1215 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1216 ga = ptlrpc_req_async_args(req);
1218 ga->ga_minfo = minfo;
1219 ga->ga_einfo = einfo;
1221 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1222 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);