Whamcloud - gitweb
LU-5557 mdt: track reint operations in MDS service stats
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index d2c36c2..5a762e1 100644 (file)
@@ -46,6 +46,7 @@
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <lprocfs_status.h>
 #include "mdt_internal.h"
 #include <lustre_lmv.h>
 
@@ -535,8 +536,8 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        RETURN(rc);
 }
 
-int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
-                 struct md_attr *ma, int flags)
+static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
+                       struct md_attr *ma)
 {
        struct mdt_lock_handle  *lh;
        int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
@@ -547,9 +548,6 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        int rc;
        ENTRY;
 
-       /* attr shouldn't be set on remote object */
-       LASSERT(!mdt_object_remote(mo));
-
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_reg_init(lh, LCK_PW);
 
@@ -565,14 +563,11 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                 RETURN(rc);
 
        s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_EX);
+       mdt_lock_reg_init(s0_lh, LCK_PW);
        rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo);
        if (rc != 0)
                GOTO(out_unlock, rc);
 
-        if (mdt_object_exists(mo) == 0)
-                GOTO(out_unlock, rc = -ENOENT);
-
         /* all attrs are packed into mti_attr in unpack_setattr */
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
@@ -678,6 +673,12 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         if (IS_ERR(mo))
                 GOTO(out, rc = PTR_ERR(mo));
 
+       if (!mdt_object_exists(mo))
+               GOTO(out_put, rc = -ENOENT);
+
+       if (mdt_object_remote(mo))
+               GOTO(out_put, rc = -EREMOTE);
+
         /* start a log jounal handle if needed */
         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
@@ -717,8 +718,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         if (som_au) {
                 /* SOM Attribute update case. Find the proper mfd and update
                  * SOM attributes on the proper object. */
-                LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
-                LASSERT(info->mti_ioepoch);
+               if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM))
+                       GOTO(out_put, rc = -EPROTO);
 
                spin_lock(&med->med_open_lock);
                mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
@@ -731,8 +732,10 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                                info->mti_ioepoch->handle.cookie);
                         GOTO(out_put, rc = -ESTALE);
                 }
-                LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
-                LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
+
+               if (mfd->mfd_mode != MDS_FMODE_SOM ||
+                   (info->mti_ioepoch->flags & MF_EPOCH_CLOSE))
+                       GOTO(out_put, rc = -EPROTO);
 
                class_handle_unhash(&mfd->mfd_handle);
                list_del_init(&mfd->mfd_list);
@@ -740,13 +743,18 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
                 mdt_mfd_close(info, mfd);
        } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
-               LASSERT((ma->ma_valid & MA_LOV) == 0);
-                rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
+               if (ma->ma_valid & MA_LOV)
+                       GOTO(out_put, rc = -EPROTO);
+
+               rc = mdt_attr_set(info, mo, ma);
                 if (rc)
                         GOTO(out_put, rc);
        } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
                struct lu_buf *buf  = &info->mti_buf;
-               LASSERT(ma->ma_attr.la_valid == 0);
+
+               if (ma->ma_attr.la_valid != 0)
+                       GOTO(out_put, rc = -EPROTO);
+
                buf->lb_buf = ma->ma_lmm;
                buf->lb_len = ma->ma_lmm_size;
                rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
@@ -756,15 +764,18 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
        } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) {
                struct lu_buf *buf  = &info->mti_buf;
 
-               LASSERT(ma->ma_attr.la_valid == 0);
+               if (ma->ma_attr.la_valid != 0)
+                       GOTO(out_put, rc = -EPROTO);
+
                buf->lb_buf = ma->ma_lmv;
                buf->lb_len = ma->ma_lmv_size;
                rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
                                  buf, XATTR_NAME_DEFAULT_LMV, 0);
                if (rc)
                        GOTO(out_put, rc);
-       } else
-               LBUG();
+       } else {
+               GOTO(out_put, rc = -EPROTO);
+       }
 
        /* If file data is modified, add the dirty flag */
        if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
@@ -1978,8 +1989,8 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
        if (info->mti_dlm_req)
                ldlm_request_cancel(req, info->mti_dlm_req, 0);
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
-           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+       if (!fid_is_md_operative(rr->rr_fid1) ||
+           !fid_is_md_operative(rr->rr_fid2))
                RETURN(-EPERM);
 
        rc = mdt_rename_lock(info, &rename_lh, rename_lock);
@@ -2012,28 +2023,68 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
        return mdt_reint_rename_or_migrate(info, lhc, MRL_MIGRATE);
 }
 
-typedef int (*mdt_reinter)(struct mdt_thread_info *info,
-                           struct mdt_lock_handle *lhc);
-
-static mdt_reinter reinters[REINT_MAX] = {
-       [REINT_SETATTR]  = mdt_reint_setattr,
-       [REINT_CREATE]   = mdt_reint_create,
-       [REINT_LINK]     = mdt_reint_link,
-       [REINT_UNLINK]   = mdt_reint_unlink,
-       [REINT_RENAME]   = mdt_reint_rename,
-       [REINT_OPEN]     = mdt_reint_open,
-       [REINT_SETXATTR] = mdt_reint_setxattr,
-       [REINT_RMENTRY]  = mdt_reint_unlink,
-       [REINT_MIGRATE]   = mdt_reint_migrate,
+struct mdt_reinter {
+       int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
+       enum lprocfs_extra_opc mr_extra_opc;
+};
+
+static const struct mdt_reinter mdt_reinters[] = {
+       [REINT_SETATTR] = {
+               .mr_handler = &mdt_reint_setattr,
+               .mr_extra_opc = MDS_REINT_SETATTR,
+       },
+       [REINT_CREATE] = {
+               .mr_handler = &mdt_reint_create,
+               .mr_extra_opc = MDS_REINT_CREATE,
+       },
+       [REINT_LINK] = {
+               .mr_handler = &mdt_reint_link,
+               .mr_extra_opc = MDS_REINT_LINK,
+       },
+       [REINT_UNLINK] = {
+               .mr_handler = &mdt_reint_unlink,
+               .mr_extra_opc = MDS_REINT_UNLINK,
+       },
+       [REINT_RENAME] = {
+               .mr_handler = &mdt_reint_rename,
+               .mr_extra_opc = MDS_REINT_RENAME,
+       },
+       [REINT_OPEN] = {
+               .mr_handler = &mdt_reint_open,
+               .mr_extra_opc = MDS_REINT_OPEN,
+       },
+       [REINT_SETXATTR] = {
+               .mr_handler = &mdt_reint_setxattr,
+               .mr_extra_opc = MDS_REINT_SETXATTR,
+       },
+       [REINT_RMENTRY] = {
+               .mr_handler = &mdt_reint_unlink,
+               .mr_extra_opc = MDS_REINT_UNLINK,
+       },
+       [REINT_MIGRATE] = {
+               .mr_handler = &mdt_reint_migrate,
+               .mr_extra_opc = MDS_REINT_RENAME,
+       },
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,
-                  struct mdt_lock_handle *lhc)
+                 struct mdt_lock_handle *lhc)
 {
-        int rc;
-        ENTRY;
+       const struct mdt_reinter *mr;
+       int rc;
+       ENTRY;
 
-        rc = reinters[info->mti_rr.rr_opcode](info, lhc);
+       if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
+               RETURN(-EPROTO);
 
-        RETURN(rc);
+       mr = &mdt_reinters[info->mti_rr.rr_opcode];
+       if (mr->mr_handler == NULL)
+               RETURN(-EPROTO);
+
+       rc = (*mr->mr_handler)(info, lhc);
+
+       lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
+                            PTLRPC_LAST_CNTR + mr->mr_extra_opc);
+
+       RETURN(rc);
 }