Whamcloud - gitweb
LU-6047 mdt: remove Size on MDS support
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index fc4461c..bbb20c3 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -46,6 +46,7 @@
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <lprocfs_status.h>
 #include "mdt_internal.h"
 #include <lustre_lmv.h>
 
@@ -219,9 +220,10 @@ int mdt_version_get_check_save(struct mdt_thread_info *info,
  * This checks version of 'name'. Many reint functions uses 'name' for child not
  * FID, therefore we need to get object by name and check its version.
  */
-int mdt_lookup_version_check(struct mdt_thread_info *info,
-                            struct mdt_object *p, const struct lu_name *lname,
-                            struct lu_fid *fid, int idx)
+static int mdt_lookup_version_check(struct mdt_thread_info *info,
+                                   struct mdt_object *p,
+                                   const struct lu_name *lname,
+                                   struct lu_fid *fid, int idx)
 {
         int rc, vbrc;
 
@@ -312,8 +314,13 @@ static int mdt_remote_permission(struct mdt_thread_info *info,
                return -ENOTSUPP;
 
        if (S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL &&
-           spec->u.sp_ea.eadatalen != 0 && !mdt_is_striped_client(exp))
-               return -ENOTSUPP;
+           spec->u.sp_ea.eadatalen != 0) {
+               const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+
+               if (le32_to_cpu(lum->lum_stripe_count) > 1 &&
+                   !mdt_is_striped_client(exp))
+                       return -ENOTSUPP;
+       }
 
        return 0;
 }
@@ -423,18 +430,19 @@ static int mdt_md_create(struct mdt_thread_info *info)
                if (rc == 0)
                        rc = mdt_attr_get_complex(info, child, ma);
 
-                if (rc == 0) {
-                        /* Return fid & attr to client. */
-                        if (ma->ma_valid & MA_INODE)
-                                mdt_pack_attr2body(info, repbody, &ma->ma_attr,
-                                                   mdt_object_fid(child));
-                }
+               if (rc == 0) {
+                       /* Return fid & attr to client. */
+                       if (ma->ma_valid & MA_INODE)
+                               mdt_pack_attr2body(info, repbody, &ma->ma_attr,
+                                                  mdt_object_fid(child));
+               }
 out_put_child:
-                mdt_object_put(info->mti_env, child);
-        } else {
-                rc = PTR_ERR(child);
-        }
-        mdt_create_pack_capa(info, rc, child, repbody);
+               mdt_create_pack_capa(info, rc, child, repbody);
+               mdt_object_put(info->mti_env, child);
+       } else {
+               rc = PTR_ERR(child);
+               mdt_create_pack_capa(info, rc, NULL, repbody);
+       }
 unlock_parent:
        mdt_object_unlock(info, parent, lh, rc);
 put_parent:
@@ -534,8 +542,8 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        RETURN(rc);
 }
 
-int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
-                 struct md_attr *ma, int flags)
+static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
+                       struct md_attr *ma)
 {
        struct mdt_lock_handle  *lh;
        int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
@@ -546,9 +554,6 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        int rc;
        ENTRY;
 
-       /* attr shouldn't be set on remote object */
-       LASSERT(!mdt_object_remote(mo));
-
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_reg_init(lh, LCK_PW);
 
@@ -564,14 +569,11 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                 RETURN(rc);
 
        s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_EX);
+       mdt_lock_reg_init(s0_lh, LCK_PW);
        rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo);
        if (rc != 0)
                GOTO(out_unlock, rc);
 
-        if (mdt_object_exists(mo) == 0)
-                GOTO(out_unlock, rc = -ENOENT);
-
         /* all attrs are packed into mti_attr in unpack_setattr */
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
@@ -659,93 +661,48 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         struct md_attr          *ma = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct ptlrpc_request   *req = mdt_info_req(info);
-        struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
-        struct mdt_file_data    *mfd;
         struct mdt_object       *mo;
         struct mdt_body         *repbody;
-        int                      som_au, rc, rc2;
+       int                      rc, rc2;
         ENTRY;
 
         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
                   (unsigned int)ma->ma_attr.la_valid);
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
         if (IS_ERR(mo))
                 GOTO(out, rc = PTR_ERR(mo));
 
-        /* start a log jounal handle if needed */
-        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
-                if ((ma->ma_attr.la_valid & LA_SIZE) ||
-                    (rr->rr_flags & MRF_OPEN_TRUNC)) {
-                        /* Check write access for the O_TRUNC case */
-                        if (mdt_write_read(mo) < 0)
-                                GOTO(out_put, rc = -ETXTBSY);
-                }
-        } else if (info->mti_ioepoch &&
-                   (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
-                /* Truncate case. IOEpoch is opened. */
-                rc = mdt_write_get(mo);
-                if (rc)
-                        GOTO(out_put, rc);
+       if (!mdt_object_exists(mo))
+               GOTO(out_put, rc = -ENOENT);
 
-               mfd = mdt_mfd_new(med);
-                if (mfd == NULL) {
-                        mdt_write_put(mo);
-                        GOTO(out_put, rc = -ENOMEM);
-                }
-
-                mdt_ioepoch_open(info, mo, 0);
-               repbody->mbo_ioepoch = mo->mot_ioepoch;
-
-                mdt_object_get(info->mti_env, mo);
-                mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC);
-                mfd->mfd_object = mo;
-                mfd->mfd_xid = req->rq_xid;
-
-               spin_lock(&med->med_open_lock);
-               cfs_list_add(&mfd->mfd_list, &med->med_open_head);
-               spin_unlock(&med->med_open_lock);
-               repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
-        }
+       if (mdt_object_remote(mo))
+               GOTO(out_put, rc = -EREMOTE);
 
-        som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
-        if (som_au) {
-                /* SOM Attribute update case. Find the proper mfd and update
-                 * SOM attributes on the proper object. */
-                LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
-                LASSERT(info->mti_ioepoch);
-
-               spin_lock(&med->med_open_lock);
-               mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
-                                    req_is_replay(req));
-               if (mfd == NULL) {
-                       spin_unlock(&med->med_open_lock);
-                        CDEBUG(D_INODE, "no handle for file close: "
-                               "fid = "DFID": cookie = "LPX64"\n",
-                               PFID(info->mti_rr.rr_fid1),
-                               info->mti_ioepoch->handle.cookie);
-                        GOTO(out_put, rc = -ESTALE);
-                }
-                LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
-                LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
+       if ((ma->ma_attr.la_valid & LA_SIZE) ||
+           (rr->rr_flags & MRF_OPEN_TRUNC)) {
+               /* Check write access for the O_TRUNC case */
+               if (mdt_write_read(mo) < 0)
+                       GOTO(out_put, rc = -ETXTBSY);
+       }
 
-                class_handle_unhash(&mfd->mfd_handle);
-                cfs_list_del_init(&mfd->mfd_list);
-               spin_unlock(&med->med_open_lock);
+       if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
+               if (ma->ma_valid & MA_LOV)
+                       GOTO(out_put, rc = -EPROTO);
 
-                mdt_mfd_close(info, mfd);
-       } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
-               LASSERT((ma->ma_valid & MA_LOV) == 0);
-                rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
+               rc = mdt_attr_set(info, mo, ma);
                 if (rc)
                         GOTO(out_put, rc);
        } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
                struct lu_buf *buf  = &info->mti_buf;
-               LASSERT(ma->ma_attr.la_valid == 0);
+
+               if (ma->ma_attr.la_valid != 0)
+                       GOTO(out_put, rc = -EPROTO);
+
                buf->lb_buf = ma->ma_lmm;
                buf->lb_len = ma->ma_lmm_size;
                rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
@@ -754,16 +711,31 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                        GOTO(out_put, rc);
        } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) {
                struct lu_buf *buf  = &info->mti_buf;
+               struct mdt_lock_handle  *lh;
+
+               if (ma->ma_attr.la_valid != 0)
+                       GOTO(out_put, rc = -EPROTO);
+
+               lh = &info->mti_lh[MDT_LH_PARENT];
+               mdt_lock_reg_init(lh, LCK_PW);
+
+               rc = mdt_object_lock(info, mo, lh,
+                                    MDS_INODELOCK_XATTR,
+                                    MDT_LOCAL_LOCK);
+               if (rc != 0)
+                       GOTO(out_put, rc);
 
-               LASSERT(ma->ma_attr.la_valid == 0);
                buf->lb_buf = ma->ma_lmv;
                buf->lb_len = ma->ma_lmv_size;
                rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
                                  buf, XATTR_NAME_DEFAULT_LMV, 0);
+
+               mdt_object_unlock(info, mo, lh, rc);
                if (rc)
                        GOTO(out_put, rc);
-       } else
-               LBUG();
+       } else {
+               GOTO(out_put, rc = -EPROTO);
+       }
 
        /* If file data is modified, add the dirty flag */
        if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
@@ -780,7 +752,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
        if (info->mti_mdt->mdt_lut.lut_oss_capa &&
            exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA &&
            S_ISREG(lu_object_attr(&mo->mot_obj)) &&
-           (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
+           (ma->ma_attr.la_valid & LA_SIZE)) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
@@ -816,8 +788,9 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
                 RETURN(err_serious(-ESTALE));
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(mdt_info_req(info),
+                                   info->mti_dlm_req, 0, LATF_SKIP);
 
        if (!lu_name_is_valid(&info->mti_rr.rr_name))
                RETURN(-EPROTO);
@@ -871,8 +844,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
                  PNAME(&rr->rr_name));
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
                 RETURN(err_serious(-ENOENT));
@@ -1094,8 +1067,8 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
                 RETURN(err_serious(-ENOENT));
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
         /* Invalid case so return error immediately instead of
          * processing it */
@@ -1290,52 +1263,36 @@ static void mdt_rename_unlock(struct lustre_handle *lh)
  * target. Source should not be ancestor of target dir. May be other rename
  * checks can be moved here later.
  */
-static int mdt_rename_sanity(struct mdt_thread_info *info,
-                            const struct lu_fid *dir_fid,
-                            const struct lu_fid *fid)
+static int mdt_is_subdir(struct mdt_thread_info *info,
+                        struct mdt_object *dir,
+                        const struct lu_fid *fid)
 {
-        struct mdt_object *dst;
-       struct lu_fid dst_fid = *dir_fid;
+       struct lu_fid dir_fid = dir->mot_header.loh_fid;
         int rc = 0;
         ENTRY;
 
        /* If the source and target are in the same directory, they can not
         * be parent/child relationship, so subdir check is not needed */
-       if (lu_fid_eq(dir_fid, fid))
+       if (lu_fid_eq(&dir_fid, fid))
                return 0;
 
-       do {
-               LASSERT(fid_is_sane(&dst_fid));
-               dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
-               if (!IS_ERR(dst)) {
-                       /* XXX: this object might not be protected by LDLM lock
-                        * here, (see mdt_rename_parents_lock), but LOHA_EXISTS
-                        * will not change once it is being set, but LFSCK might
-                        * change this later.(LU-5069) */
-                       if (!mdt_object_exists(dst))
-                               RETURN(-ESTALE);
-
-                       rc = mdo_is_subdir(info->mti_env,
-                                          mdt_object_child(dst), fid,
-                                          &dst_fid);
-                       mdt_object_put(info->mti_env, dst);
-                       if (rc != -EREMOTE && rc < 0) {
-                               CERROR("%s: failed subdir check in "DFID" for "
-                                      DFID": rc = %d\n",
-                                      mdt_obd_name(info->mti_mdt),
-                                      PFID(dir_fid), PFID(fid), rc);
-                               /* Return EINVAL only if a parent is the @fid */
-                               if (rc == -EINVAL)
-                                       rc = -EIO;
-                       } else {
-                               /* check the found fid */
-                               if (lu_fid_eq(&dst_fid, fid))
-                                       rc = -EINVAL;
-                       }
-               } else {
-                       rc = PTR_ERR(dst);
-               }
-       } while (rc == -EREMOTE);
+       if (!mdt_object_exists(dir))
+               RETURN(-ENOENT);
+
+       rc = mdo_is_subdir(info->mti_env, mdt_object_child(dir),
+                          fid, &dir_fid);
+       if (rc < 0) {
+               CERROR("%s: failed subdir check in "DFID" for "DFID
+                      ": rc = %d\n", mdt_obd_name(info->mti_mdt),
+                      PFID(&dir_fid), PFID(fid), rc);
+               /* Return EINVAL only if a parent is the @fid */
+               if (rc == -EINVAL)
+                       rc = -EIO;
+       } else {
+               /* check the found fid */
+               if (lu_fid_eq(&dir_fid, fid))
+                       rc = -EINVAL;
+       }
 
         RETURN(rc);
 }
@@ -1366,7 +1323,7 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
                                      struct list_head *lock_list)
 {
        struct lu_buf           *buf = &info->mti_big_buf;
-       struct linkea_data      ldata = { 0 };
+       struct linkea_data      ldata = { NULL };
        int                     count;
        int                     rc;
        ENTRY;
@@ -1438,7 +1395,7 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
                        GOTO(out, rc);
                }
 
-               CFS_INIT_LIST_HEAD(&mll->mll_list);
+               INIT_LIST_HEAD(&mll->mll_list);
                mll->mll_obj = mdt_pobj;
                list_add_tail(&mll->mll_list, lock_list);
        }
@@ -1527,7 +1484,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
                GOTO(out_put_child, rc);
 
        /* 3: iterate the linkea of the object and lock all of the objects */
-       CFS_INIT_LIST_HEAD(&lock_list);
+       INIT_LIST_HEAD(&lock_list);
        rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);
        if (rc != 0)
                GOTO(out_put_child, rc);
@@ -1688,18 +1645,6 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info,
        int                      rc;
        ENTRY;
 
-       /* Check if the @src is not a child of the @tgt, otherwise a
-        * reverse locking must take place.
-        *
-        * Note: cannot be called after object_find, because if the object
-        * is destroyed in between it gets stuck in lu_object_find_at(),
-        * waiting for the last ref. */
-       rc = mdt_rename_sanity(info, fid_src, fid_tgt);
-       if (rc == -EINVAL)
-               reverse = 1;
-       else if (rc)
-               RETURN(rc);
-
        /* find both parents. */
        src = mdt_object_find_check(info, fid_src, 0);
        if (IS_ERR(src))
@@ -1711,6 +1656,14 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info,
                tgt = src;
                mdt_object_get(info->mti_env, tgt);
        } else {
+               /* Check if the @src is not a child of the @tgt, otherwise a
+                * reverse locking must take place. */
+               rc = mdt_is_subdir(info, src, fid_tgt);
+               if (rc == -EINVAL)
+                       reverse = 1;
+               else if (rc)
+                       GOTO(err_src_put, rc);
+
                tgt = mdt_object_find_check(info, fid_tgt, 1);
                if (IS_ERR(tgt))
                        GOTO(err_src_put, rc = PTR_ERR(tgt));
@@ -1723,6 +1676,8 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info,
                }
        }
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
+
        /* lock parents in the proper order. */
        if (reverse) {
                rc = mdt_object_lock_save(info, tgt, lh_tgt, 1);
@@ -1750,8 +1705,7 @@ static int mdt_rename_parents_lock(struct mdt_thread_info *info,
        if (rc)
                GOTO(err_unlock, rc);
 
-       if (lu_object_is_dying(&tgt->mot_header))
-               GOTO(err_unlock, rc = -ENOENT);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
 
        *srcp = src;
        *tgtp = tgt;
@@ -1838,7 +1792,7 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
 
        /* Check if @mtgtdir is subdir of @mold, before locking child
         * to avoid reverse locking. */
-       rc = mdt_rename_sanity(info, rr->rr_fid2, old_fid);
+       rc = mdt_is_subdir(info, mtgtdir, old_fid);
        if (rc)
                GOTO(out_put_old, rc);
 
@@ -1881,6 +1835,10 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
                        GOTO(out_put_new, rc = -EXDEV);
                }
 
+               /* Before locking the target dir, check we do not replace
+                * a dir with a non-dir, otherwise it may deadlock with
+                * link op which tries to create a link in this dir
+                * back to this non-dir. */
                if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
                    !S_ISDIR(lu_object_attr(&mold->mot_obj)))
                        GOTO(out_put_new, rc = -EISDIR);
@@ -1894,7 +1852,7 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
 
                /* Check if @msrcdir is subdir of @mnew, before locking child
                 * to avoid reverse locking. */
-               rc = mdt_rename_sanity(info, rr->rr_fid1, new_fid);
+               rc = mdt_is_subdir(info, msrcdir, new_fid);
                if (rc)
                        GOTO(out_unlock_old, rc);
 
@@ -1990,10 +1948,10 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
        ENTRY;
 
        if (info->mti_dlm_req)
-               ldlm_request_cancel(req, info->mti_dlm_req, 0);
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
-           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+       if (!fid_is_md_operative(rr->rr_fid1) ||
+           !fid_is_md_operative(rr->rr_fid2))
                RETURN(-EPERM);
 
        rc = mdt_rename_lock(info, &rename_lh, rename_lock);
@@ -2026,28 +1984,68 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
        return mdt_reint_rename_or_migrate(info, lhc, MRL_MIGRATE);
 }
 
-typedef int (*mdt_reinter)(struct mdt_thread_info *info,
-                           struct mdt_lock_handle *lhc);
-
-static mdt_reinter reinters[REINT_MAX] = {
-       [REINT_SETATTR]  = mdt_reint_setattr,
-       [REINT_CREATE]   = mdt_reint_create,
-       [REINT_LINK]     = mdt_reint_link,
-       [REINT_UNLINK]   = mdt_reint_unlink,
-       [REINT_RENAME]   = mdt_reint_rename,
-       [REINT_OPEN]     = mdt_reint_open,
-       [REINT_SETXATTR] = mdt_reint_setxattr,
-       [REINT_RMENTRY]  = mdt_reint_unlink,
-       [REINT_MIGRATE]   = mdt_reint_migrate,
+struct mdt_reinter {
+       int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
+       enum lprocfs_extra_opc mr_extra_opc;
+};
+
+static const struct mdt_reinter mdt_reinters[] = {
+       [REINT_SETATTR] = {
+               .mr_handler = &mdt_reint_setattr,
+               .mr_extra_opc = MDS_REINT_SETATTR,
+       },
+       [REINT_CREATE] = {
+               .mr_handler = &mdt_reint_create,
+               .mr_extra_opc = MDS_REINT_CREATE,
+       },
+       [REINT_LINK] = {
+               .mr_handler = &mdt_reint_link,
+               .mr_extra_opc = MDS_REINT_LINK,
+       },
+       [REINT_UNLINK] = {
+               .mr_handler = &mdt_reint_unlink,
+               .mr_extra_opc = MDS_REINT_UNLINK,
+       },
+       [REINT_RENAME] = {
+               .mr_handler = &mdt_reint_rename,
+               .mr_extra_opc = MDS_REINT_RENAME,
+       },
+       [REINT_OPEN] = {
+               .mr_handler = &mdt_reint_open,
+               .mr_extra_opc = MDS_REINT_OPEN,
+       },
+       [REINT_SETXATTR] = {
+               .mr_handler = &mdt_reint_setxattr,
+               .mr_extra_opc = MDS_REINT_SETXATTR,
+       },
+       [REINT_RMENTRY] = {
+               .mr_handler = &mdt_reint_unlink,
+               .mr_extra_opc = MDS_REINT_UNLINK,
+       },
+       [REINT_MIGRATE] = {
+               .mr_handler = &mdt_reint_migrate,
+               .mr_extra_opc = MDS_REINT_RENAME,
+       },
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,
-                  struct mdt_lock_handle *lhc)
+                 struct mdt_lock_handle *lhc)
 {
-        int rc;
-        ENTRY;
+       const struct mdt_reinter *mr;
+       int rc;
+       ENTRY;
 
-        rc = reinters[info->mti_rr.rr_opcode](info, lhc);
+       if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
+               RETURN(-EPROTO);
 
-        RETURN(rc);
+       mr = &mdt_reinters[info->mti_rr.rr_opcode];
+       if (mr->mr_handler == NULL)
+               RETURN(-EPROTO);
+
+       rc = (*mr->mr_handler)(info, lhc);
+
+       lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
+                            PTLRPC_LAST_CNTR + mr->mr_extra_opc);
+
+       RETURN(rc);
 }