X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_reint.c;h=62538bcce68dbe841b1d3dcb6ed8f2a7f27a7668;hb=ad1810a2dbea1eed5e8b5feb55bdf915a545feb3;hp=6141ec945887fec25052b62d412f636d1dd564b7;hpb=9c255853668ed64dfdd8eec9b157f05c2b7832ed;p=fs%2Flustre-release.git diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 6141ec9..62538bc 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -46,6 +46,7 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include "mdt_internal.h" #include @@ -64,7 +65,7 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc, /* for cross-ref mkdir, mds capa has been fetched from remote obj, then * we won't go to below*/ - if (repbody->valid & OBD_MD_FLMDSCAPA) + if (repbody->mbo_valid & OBD_MD_FLMDSCAPA) RETURN(rc); if (rc == 0 && info->mti_mdt->mdt_lut.lut_mds_capa && @@ -77,7 +78,7 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc, rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa, 0); if (rc == 0) - repbody->valid |= OBD_MD_FLMDSCAPA; + repbody->mbo_valid |= OBD_MD_FLMDSCAPA; } RETURN(rc); @@ -219,9 +220,10 @@ int mdt_version_get_check_save(struct mdt_thread_info *info, * This checks version of 'name'. Many reint functions uses 'name' for child not * FID, therefore we need to get object by name and check its version. */ -int mdt_lookup_version_check(struct mdt_thread_info *info, - struct mdt_object *p, const struct lu_name *lname, - struct lu_fid *fid, int idx) +static int mdt_lookup_version_check(struct mdt_thread_info *info, + struct mdt_object *p, + const struct lu_name *lname, + struct lu_fid *fid, int idx) { int rc, vbrc; @@ -312,8 +314,13 @@ static int mdt_remote_permission(struct mdt_thread_info *info, return -ENOTSUPP; if (S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL && - spec->u.sp_ea.eadatalen != 0 && !mdt_is_striped_client(exp)) - return -ENOTSUPP; + spec->u.sp_ea.eadatalen != 0) { + const struct lmv_user_md *lum = spec->u.sp_ea.eadata; + + if (le32_to_cpu(lum->lum_stripe_count) > 1 && + !mdt_is_striped_client(exp)) + return -ENOTSUPP; + } return 0; } @@ -423,18 +430,19 @@ static int mdt_md_create(struct mdt_thread_info *info) if (rc == 0) rc = mdt_attr_get_complex(info, child, ma); - if (rc == 0) { - /* Return fid & attr to client. */ - if (ma->ma_valid & MA_INODE) - mdt_pack_attr2body(info, repbody, &ma->ma_attr, - mdt_object_fid(child)); - } + if (rc == 0) { + /* Return fid & attr to client. */ + if (ma->ma_valid & MA_INODE) + mdt_pack_attr2body(info, repbody, &ma->ma_attr, + mdt_object_fid(child)); + } out_put_child: - mdt_object_put(info->mti_env, child); - } else { - rc = PTR_ERR(child); - } - mdt_create_pack_capa(info, rc, child, repbody); + mdt_create_pack_capa(info, rc, child, repbody); + mdt_object_put(info->mti_env, child); + } else { + rc = PTR_ERR(child); + mdt_create_pack_capa(info, rc, NULL, repbody); + } unlock_parent: mdt_object_unlock(info, parent, lh, rc); put_parent: @@ -534,8 +542,8 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, RETURN(rc); } -int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, - struct md_attr *ma, int flags) +static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, + struct md_attr *ma) { struct mdt_lock_handle *lh; int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS); @@ -546,9 +554,6 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int rc; ENTRY; - /* attr shouldn't be set on remote object */ - LASSERT(!mdt_object_remote(mo)); - lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_reg_init(lh, LCK_PW); @@ -564,14 +569,11 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, RETURN(rc); s0_lh = &info->mti_lh[MDT_LH_LOCAL]; - mdt_lock_reg_init(s0_lh, LCK_EX); + mdt_lock_reg_init(s0_lh, LCK_PW); rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo); if (rc != 0) GOTO(out_unlock, rc); - if (mdt_object_exists(mo) == 0) - GOTO(out_unlock, rc = -ENOENT); - /* all attrs are packed into mti_attr in unpack_setattr */ mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_SETATTR_WRITE); @@ -669,14 +671,20 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1), (unsigned int)ma->ma_attr.la_valid); - if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(mo)) GOTO(out, rc = PTR_ERR(mo)); + if (!mdt_object_exists(mo)) + GOTO(out_put, rc = -ENOENT); + + if (mdt_object_remote(mo)) + GOTO(out_put, rc = -EREMOTE); + /* start a log jounal handle if needed */ if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) { if ((ma->ma_attr.la_valid & LA_SIZE) || @@ -699,7 +707,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, } mdt_ioepoch_open(info, mo, 0); - repbody->ioepoch = mo->mot_ioepoch; + repbody->mbo_ioepoch = mo->mot_ioepoch; mdt_object_get(info->mti_env, mo); mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC); @@ -707,17 +715,17 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, mfd->mfd_xid = req->rq_xid; spin_lock(&med->med_open_lock); - cfs_list_add(&mfd->mfd_list, &med->med_open_head); + list_add(&mfd->mfd_list, &med->med_open_head); spin_unlock(&med->med_open_lock); - repbody->handle.cookie = mfd->mfd_handle.h_cookie; + repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie; } som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE; if (som_au) { /* SOM Attribute update case. Find the proper mfd and update * SOM attributes on the proper object. */ - LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM); - LASSERT(info->mti_ioepoch); + if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) + GOTO(out_put, rc = -EPROTO); spin_lock(&med->med_open_lock); mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle, @@ -730,22 +738,29 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, info->mti_ioepoch->handle.cookie); GOTO(out_put, rc = -ESTALE); } - LASSERT(mfd->mfd_mode == MDS_FMODE_SOM); - LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE)); - class_handle_unhash(&mfd->mfd_handle); - cfs_list_del_init(&mfd->mfd_list); + if (mfd->mfd_mode != MDS_FMODE_SOM || + (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)) + GOTO(out_put, rc = -EPROTO); + + class_handle_unhash(&mfd->mfd_handle); + list_del_init(&mfd->mfd_list); spin_unlock(&med->med_open_lock); mdt_mfd_close(info, mfd); } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) { - LASSERT((ma->ma_valid & MA_LOV) == 0); - rc = mdt_attr_set(info, mo, ma, rr->rr_flags); + if (ma->ma_valid & MA_LOV) + GOTO(out_put, rc = -EPROTO); + + rc = mdt_attr_set(info, mo, ma); if (rc) GOTO(out_put, rc); } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) { struct lu_buf *buf = &info->mti_buf; - LASSERT(ma->ma_attr.la_valid == 0); + + if (ma->ma_attr.la_valid != 0) + GOTO(out_put, rc = -EPROTO); + buf->lb_buf = ma->ma_lmm; buf->lb_len = ma->ma_lmm_size; rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), @@ -755,15 +770,18 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) { struct lu_buf *buf = &info->mti_buf; - LASSERT(ma->ma_attr.la_valid == 0); + if (ma->ma_attr.la_valid != 0) + GOTO(out_put, rc = -EPROTO); + buf->lb_buf = ma->ma_lmv; buf->lb_len = ma->ma_lmv_size; rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf, XATTR_NAME_DEFAULT_LMV, 0); if (rc) GOTO(out_put, rc); - } else - LBUG(); + } else { + GOTO(out_put, rc = -EPROTO); + } /* If file data is modified, add the dirty flag */ if (ma->ma_attr_flags & MDS_DATA_MODIFIED) @@ -789,7 +807,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0); if (rc) GOTO(out_put, rc); - repbody->valid |= OBD_MD_FLOSSCAPA; + repbody->mbo_valid |= OBD_MD_FLOSSCAPA; } EXIT; @@ -816,8 +834,9 @@ static int mdt_reint_create(struct mdt_thread_info *info, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) RETURN(err_serious(-ESTALE)); - if (info->mti_dlm_req) - ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(mdt_info_req(info), + info->mti_dlm_req, 0, LATF_SKIP); if (!lu_name_is_valid(&info->mti_rr.rr_name)) RETURN(-EPROTO); @@ -871,8 +890,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1), PNAME(&rr->rr_name)); - if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) RETURN(err_serious(-ENOENT)); @@ -985,8 +1004,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, MDT_CROSS_LOCK); repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(repbody != NULL); - repbody->fid1 = *mdt_object_fid(mc); - repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); + repbody->mbo_fid1 = *mdt_object_fid(mc); + repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS); GOTO(unlock_child, rc = -EREMOTE); } else if (info->mti_spec.sp_rm_entry) { rc = -EPERM; @@ -1094,8 +1113,8 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) RETURN(err_serious(-ENOENT)); - if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); /* Invalid case so return error immediately instead of * processing it */ @@ -1118,6 +1137,8 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (rc) GOTO(out_unlock_parent, rc); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); + /* step 2: find & lock the source */ lhs = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_reg_init(lhs, LCK_EX); @@ -1288,52 +1309,36 @@ static void mdt_rename_unlock(struct lustre_handle *lh) * target. Source should not be ancestor of target dir. May be other rename * checks can be moved here later. */ -static int mdt_rename_sanity(struct mdt_thread_info *info, - const struct lu_fid *dir_fid, - const struct lu_fid *fid) +static int mdt_is_subdir(struct mdt_thread_info *info, + struct mdt_object *dir, + const struct lu_fid *fid) { - struct mdt_object *dst; - struct lu_fid dst_fid = *dir_fid; + struct lu_fid dir_fid = dir->mot_header.loh_fid; int rc = 0; ENTRY; /* If the source and target are in the same directory, they can not * be parent/child relationship, so subdir check is not needed */ - if (lu_fid_eq(dir_fid, fid)) + if (lu_fid_eq(&dir_fid, fid)) return 0; - do { - LASSERT(fid_is_sane(&dst_fid)); - dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid); - if (!IS_ERR(dst)) { - /* XXX: this object might not be protected by LDLM lock - * here, (see mdt_rename_parents_lock), but LOHA_EXISTS - * will not change once it is being set, but LFSCK might - * change this later.(LU-5069) */ - if (!mdt_object_exists(dst)) - RETURN(-ESTALE); - - rc = mdo_is_subdir(info->mti_env, - mdt_object_child(dst), fid, - &dst_fid); - mdt_object_put(info->mti_env, dst); - if (rc != -EREMOTE && rc < 0) { - CERROR("%s: failed subdir check in "DFID" for " - DFID": rc = %d\n", - mdt_obd_name(info->mti_mdt), - PFID(dir_fid), PFID(fid), rc); - /* Return EINVAL only if a parent is the @fid */ - if (rc == -EINVAL) - rc = -EIO; - } else { - /* check the found fid */ - if (lu_fid_eq(&dst_fid, fid)) - rc = -EINVAL; - } - } else { - rc = PTR_ERR(dst); - } - } while (rc == -EREMOTE); + if (!mdt_object_exists(dir)) + RETURN(-ENOENT); + + rc = mdo_is_subdir(info->mti_env, mdt_object_child(dir), + fid, &dir_fid); + if (rc < 0) { + CERROR("%s: failed subdir check in "DFID" for "DFID + ": rc = %d\n", mdt_obd_name(info->mti_mdt), + PFID(&dir_fid), PFID(fid), rc); + /* Return EINVAL only if a parent is the @fid */ + if (rc == -EINVAL) + rc = -EIO; + } else { + /* check the found fid */ + if (lu_fid_eq(&dir_fid, fid)) + rc = -EINVAL; + } RETURN(rc); } @@ -1436,7 +1441,7 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info, GOTO(out, rc); } - CFS_INIT_LIST_HEAD(&mll->mll_list); + INIT_LIST_HEAD(&mll->mll_list); mll->mll_obj = mdt_pobj; list_add_tail(&mll->mll_list, lock_list); } @@ -1525,7 +1530,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, GOTO(out_put_child, rc); /* 3: iterate the linkea of the object and lock all of the objects */ - CFS_INIT_LIST_HEAD(&lock_list); + INIT_LIST_HEAD(&lock_list); rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list); if (rc != 0) GOTO(out_put_child, rc); @@ -1686,18 +1691,6 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info, int rc; ENTRY; - /* Check if the @src is not a child of the @tgt, otherwise a - * reverse locking must take place. - * - * Note: cannot be called after object_find, because if the object - * is destroyed in between it gets stuck in lu_object_find_at(), - * waiting for the last ref. */ - rc = mdt_rename_sanity(info, fid_src, fid_tgt); - if (rc == -EINVAL) - reverse = 1; - else if (rc) - RETURN(rc); - /* find both parents. */ src = mdt_object_find_check(info, fid_src, 0); if (IS_ERR(src)) @@ -1709,6 +1702,14 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info, tgt = src; mdt_object_get(info->mti_env, tgt); } else { + /* Check if the @src is not a child of the @tgt, otherwise a + * reverse locking must take place. */ + rc = mdt_is_subdir(info, src, fid_tgt); + if (rc == -EINVAL) + reverse = 1; + else if (rc) + GOTO(err_src_put, rc); + tgt = mdt_object_find_check(info, fid_tgt, 1); if (IS_ERR(tgt)) GOTO(err_src_put, rc = PTR_ERR(tgt)); @@ -1721,6 +1722,8 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info, } } + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); + /* lock parents in the proper order. */ if (reverse) { rc = mdt_object_lock_save(info, tgt, lh_tgt, 1); @@ -1748,8 +1751,7 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info, if (rc) GOTO(err_unlock, rc); - if (lu_object_is_dying(&tgt->mot_header)) - GOTO(err_unlock, rc = -ENOENT); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); *srcp = src; *tgtp = tgt; @@ -1836,7 +1838,7 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, /* Check if @mtgtdir is subdir of @mold, before locking child * to avoid reverse locking. */ - rc = mdt_rename_sanity(info, rr->rr_fid2, old_fid); + rc = mdt_is_subdir(info, mtgtdir, old_fid); if (rc) GOTO(out_put_old, rc); @@ -1879,6 +1881,14 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, GOTO(out_put_new, rc = -EXDEV); } + /* Before locking the target dir, check we do not replace + * a dir with a non-dir, otherwise it may deadlock with + * link op which tries to create a link in this dir + * back to this non-dir. */ + if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) && + !S_ISDIR(lu_object_attr(&mold->mot_obj))) + GOTO(out_put_new, rc = -EISDIR); + lh_oldp = &info->mti_lh[MDT_LH_OLD]; mdt_lock_reg_init(lh_oldp, LCK_EX); rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP | @@ -1888,7 +1898,7 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, /* Check if @msrcdir is subdir of @mnew, before locking child * to avoid reverse locking. */ - rc = mdt_rename_sanity(info, rr->rr_fid1, new_fid); + rc = mdt_is_subdir(info, msrcdir, new_fid); if (rc) GOTO(out_unlock_old, rc); @@ -1984,10 +1994,10 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info, ENTRY; if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); - if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) || - fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2)) + if (!fid_is_md_operative(rr->rr_fid1) || + !fid_is_md_operative(rr->rr_fid2)) RETURN(-EPERM); rc = mdt_rename_lock(info, &rename_lh, rename_lock); @@ -2020,28 +2030,68 @@ static int mdt_reint_migrate(struct mdt_thread_info *info, return mdt_reint_rename_or_migrate(info, lhc, MRL_MIGRATE); } -typedef int (*mdt_reinter)(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc); - -static mdt_reinter reinters[REINT_MAX] = { - [REINT_SETATTR] = mdt_reint_setattr, - [REINT_CREATE] = mdt_reint_create, - [REINT_LINK] = mdt_reint_link, - [REINT_UNLINK] = mdt_reint_unlink, - [REINT_RENAME] = mdt_reint_rename, - [REINT_OPEN] = mdt_reint_open, - [REINT_SETXATTR] = mdt_reint_setxattr, - [REINT_RMENTRY] = mdt_reint_unlink, - [REINT_MIGRATE] = mdt_reint_migrate, +struct mdt_reinter { + int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *); + enum lprocfs_extra_opc mr_extra_opc; +}; + +static const struct mdt_reinter mdt_reinters[] = { + [REINT_SETATTR] = { + .mr_handler = &mdt_reint_setattr, + .mr_extra_opc = MDS_REINT_SETATTR, + }, + [REINT_CREATE] = { + .mr_handler = &mdt_reint_create, + .mr_extra_opc = MDS_REINT_CREATE, + }, + [REINT_LINK] = { + .mr_handler = &mdt_reint_link, + .mr_extra_opc = MDS_REINT_LINK, + }, + [REINT_UNLINK] = { + .mr_handler = &mdt_reint_unlink, + .mr_extra_opc = MDS_REINT_UNLINK, + }, + [REINT_RENAME] = { + .mr_handler = &mdt_reint_rename, + .mr_extra_opc = MDS_REINT_RENAME, + }, + [REINT_OPEN] = { + .mr_handler = &mdt_reint_open, + .mr_extra_opc = MDS_REINT_OPEN, + }, + [REINT_SETXATTR] = { + .mr_handler = &mdt_reint_setxattr, + .mr_extra_opc = MDS_REINT_SETXATTR, + }, + [REINT_RMENTRY] = { + .mr_handler = &mdt_reint_unlink, + .mr_extra_opc = MDS_REINT_UNLINK, + }, + [REINT_MIGRATE] = { + .mr_handler = &mdt_reint_migrate, + .mr_extra_opc = MDS_REINT_RENAME, + }, }; int mdt_reint_rec(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) + struct mdt_lock_handle *lhc) { - int rc; - ENTRY; + const struct mdt_reinter *mr; + int rc; + ENTRY; + + if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters))) + RETURN(-EPROTO); - rc = reinters[info->mti_rr.rr_opcode](info, lhc); + mr = &mdt_reinters[info->mti_rr.rr_opcode]; + if (mr->mr_handler == NULL) + RETURN(-EPROTO); - RETURN(rc); + rc = (*mr->mr_handler)(info, lhc); + + lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats, + PTLRPC_LAST_CNTR + mr->mr_extra_opc); + + RETURN(rc); }