Whamcloud - gitweb
LU-4843 mdt: disallow old clients access striped dir
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 66c60ee..8ae3794 100644 (file)
@@ -53,6 +53,7 @@
  * struct OBD_{ALLOC,FREE}*()
  */
 #include <obd_support.h>
+#include <lustre_ioctl.h>
 /* struct ptlrpc_request */
 #include <lustre_net.h>
 /* struct obd_export */
@@ -507,8 +508,8 @@ out:
        return rc;
 }
 
-static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
-                            const char *name)
+int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
+                     const char *name)
 {
        const struct lu_env *env = info->mti_env;
        int rc;
@@ -784,9 +785,9 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
        if (mdt_object_remote(o)) {
                /* This object is located on remote node.*/
-               /* Return -EIO for old client */
+               /* Return -ENOTSUPP for old client */
                if (!mdt_is_dne_client(req->rq_export))
-                       GOTO(out, rc = -EIO);
+                       GOTO(out, rc = -ENOTSUPP);
 
                repbody->fid1 = *mdt_object_fid(o);
                repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
@@ -896,12 +897,19 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                         mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->valid);
                 }
                if (ma->ma_valid & MA_LMV) {
+                       /* Return -ENOTSUPP for old client */
+                       if (!mdt_is_striped_client(req->rq_export))
+                               RETURN(-ENOTSUPP);
+
                        LASSERT(S_ISDIR(la->la_mode));
                        mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->eadatasize = ma->ma_lmv_size;
                        repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
                }
                if (ma->ma_valid & MA_LMV_DEF) {
+                       /* Return -ENOTSUPP for old client */
+                       if (!mdt_is_striped_client(req->rq_export))
+                               RETURN(-ENOTSUPP);
                        LASSERT(S_ISDIR(la->la_mode));
                        repbody->eadatasize = ma->ma_lmv_size;
                        repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_DEFAULT_MEA);
@@ -1078,6 +1086,10 @@ static int mdt_getattr(struct tgt_session_info *tsi)
         LASSERT(obj != NULL);
        LASSERT(lu_object_assert_exists(&obj->mot_obj));
 
+       /* Unlike intent case where we need to pre-fill out buffers early on
+        * in intent policy for ldlm reasons, here we can have a much better
+        * guess at EA size by just reading it from disk.
+        * Exceptions are readdir and (missing) directory striping */
        /* Readlink */
        if (reqbody->valid & OBD_MD_LINKNAME) {
                /* No easy way to know how long is the symlink, but it cannot
@@ -1096,15 +1108,13 @@ static int mdt_getattr(struct tgt_session_info *tsi)
                 * will reallocate */
                rc = DEF_REP_MD_SIZE;
        } else {
-               /* Hopefully no race in EA change for either file or directory?
-                */
+               /* Read the actual EA size from disk */
                rc = mdt_attr_get_eabuf_size(info, obj);
        }
 
        if (rc < 0)
                GOTO(out_shrink, rc);
 
-       /* old clients may not report needed easize, use max value then */
        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
 
        rc = req_capsule_server_pack(pill);
@@ -4365,12 +4375,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
        mdt_quota_fini(env, m);
 
-        cfs_free_nidlist(&m->mdt_nosquash_nids);
-        if (m->mdt_nosquash_str) {
-                OBD_FREE(m->mdt_nosquash_str, m->mdt_nosquash_strlen);
-                m->mdt_nosquash_str = NULL;
-                m->mdt_nosquash_strlen = 0;
-        }
+       cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
 
         mdt_seq_fini(env, m);
         mdt_fld_fini(env, m);
@@ -4386,7 +4391,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
        LASSERT(atomic_read(&d->ld_ref) == 0);
 
-       server_put_mount(mdt_obd_name(m));
+       server_put_mount(mdt_obd_name(m), true);
 
        EXIT;
 }
@@ -4456,12 +4461,10 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_capa_timeout = CAPA_TIMEOUT;
         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
-        m->mdt_squash_uid = 0;
-        m->mdt_squash_gid = 0;
-        CFS_INIT_LIST_HEAD(&m->mdt_nosquash_nids);
-        m->mdt_nosquash_str = NULL;
-        m->mdt_nosquash_strlen = 0;
-       init_rwsem(&m->mdt_squash_sem);
+       m->mdt_squash.rsi_uid = 0;
+       m->mdt_squash.rsi_gid = 0;
+       INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
+       init_rwsem(&m->mdt_squash.rsi_sem);
        spin_lock_init(&m->mdt_osfs_lock);
        m->mdt_osfs_age = cfs_time_shift_64(-1000);
        m->mdt_enable_remote_dir = 0;
@@ -4642,7 +4645,7 @@ err_fini_stack:
        mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 err_lmi:
        if (lmi)
-               server_put_mount(dev);
+               server_put_mount(dev, true);
        return(rc);
 }
 
@@ -4670,7 +4673,6 @@ static int mdt_process_config(const struct lu_env *env,
 
        switch (cfg->lcfg_command) {
        case LCFG_PARAM: {
-               struct lprocfs_static_vars  lvars;
                struct obd_device          *obd = d->ld_obd;
 
                /* For interoperability */
@@ -4705,14 +4707,13 @@ static int mdt_process_config(const struct lu_env *env,
                        }
                }
 
-               lprocfs_mdt_init_vars(&lvars);
-               rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars,
-                                             cfg, obd);
+               rc = class_process_proc_seq_param(PARAM_MDT, obd->obd_vars,
+                                                       cfg, obd);
                if (rc > 0 || rc == -ENOSYS) {
                        /* is it an HSM var ? */
-                       rc = class_process_proc_param(PARAM_HSM,
-                                                     hsm_cdt_get_proc_vars(),
-                                                     cfg, obd);
+                       rc = class_process_proc_seq_param(PARAM_HSM,
+                                                       hsm_cdt_get_proc_vars(),
+                                                       cfg, obd);
                        if (rc > 0 || rc == -ENOSYS)
                                /* we don't understand; pass it on */
                                rc = next->ld_ops->ldo_process_config(env, next,
@@ -4818,7 +4819,6 @@ static int mdt_prepare(const struct lu_env *env,
        struct mdt_device *mdt = mdt_dev(cdev);
        struct lu_device *next = &mdt->mdt_child->md_lu_dev;
        struct obd_device *obd = cdev->ld_obd;
-       struct lfsck_start_param lsp;
        int rc;
 
        ENTRY;
@@ -4842,17 +4842,6 @@ static int mdt_prepare(const struct lu_env *env,
         * register the namespace to such instance. */
        LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc);
 
-       lsp.lsp_start = NULL;
-       lsp.lsp_index_valid = 0;
-       rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
-                                                  OBD_IOC_START_LFSCK,
-                                                  0, &lsp);
-       if (rc != 0) {
-               CWARN("%s: auto trigger paused LFSCK failed: rc = %d\n",
-                     mdt_obd_name(mdt), rc);
-               rc = 0;
-       }
-
        if (mdt->mdt_seq_site.ss_node_id == 0) {
                rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
                                                         &mdt->mdt_md_root_fid);
@@ -5356,19 +5345,46 @@ static int mdt_path_current(struct mdt_thread_info *info,
        --ptr;
        pli->pli_fidcount = 0;
        pli->pli_fids[0] = *(struct lu_fid *)mdt_object_fid(pli->pli_mdt_obj);
-
+       *tmpfid = pli->pli_fids[0];
        /* root FID only exists on MDT0, and fid2path should also ends at MDT0,
         * so checking root_fid can only happen on MDT0. */
        while (!lu_fid_eq(&mdt->mdt_md_root_fid,
                          &pli->pli_fids[pli->pli_fidcount])) {
-               mdt_obj = mdt_object_find(info->mti_env, mdt,
-                                         &pli->pli_fids[pli->pli_fidcount]);
+               struct lu_buf           lmv_buf;
+
+               mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid);
                if (IS_ERR(mdt_obj))
                        GOTO(out, rc = PTR_ERR(mdt_obj));
+
                if (mdt_object_remote(mdt_obj)) {
                        mdt_object_put(info->mti_env, mdt_obj);
                        GOTO(remote_out, rc = -EREMOTE);
                }
+
+               lmv_buf.lb_buf = info->mti_xattr_buf;
+               lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
+
+               /* Check if it is slave stripes */
+               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+                                 &lmv_buf, XATTR_NAME_LMV);
+               if (rc > 0) {
+                       union lmv_mds_md *lmm = lmv_buf.lb_buf;
+
+                       /* For slave stripes, get its master */
+                       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
+                               struct lmv_mds_md_v1 *lmm1 = &lmm->lmv_md_v1;
+
+                               fid_le_to_cpu(tmpfid, &lmm1->lmv_master_fid);
+                               if (!fid_is_sane(tmpfid)) {
+                                       mdt_object_put(info->mti_env, mdt_obj);
+                                       GOTO(out, rc = -EINVAL);
+                               }
+                               mdt_object_put(info->mti_env, mdt_obj);
+                               pli->pli_fids[pli->pli_fidcount] = *tmpfid;
+                               continue;
+                       }
+               }
+
                if (!mdt_object_exists(mdt_obj)) {
                        mdt_object_put(info->mti_env, mdt_obj);
                        GOTO(out, rc = -ENOENT);
@@ -5753,12 +5769,22 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
 static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
 {
-        struct lu_device *ld = md2lu_dev(mdt->mdt_child);
-        int rc;
-        ENTRY;
+       struct lu_device *ld = md2lu_dev(mdt->mdt_child);
+       struct lfsck_start_param lsp;
+       int rc;
+       ENTRY;
 
-        rc = ld->ld_ops->ldo_recovery_complete(env, ld);
-        RETURN(rc);
+       lsp.lsp_start = NULL;
+       lsp.lsp_index_valid = 0;
+       rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
+                                                  OBD_IOC_START_LFSCK,
+                                                  0, &lsp);
+       if (rc != 0 && rc != -EALREADY)
+               CWARN("%s: auto trigger paused LFSCK failed: rc = %d\n",
+                     mdt_obd_name(mdt), rc);
+
+       rc = ld->ld_ops->ldo_recovery_complete(env, ld);
+       RETURN(rc);
 }
 
 static int mdt_obd_postrecov(struct obd_device *obd)
@@ -5911,7 +5937,6 @@ static struct lu_device_type mdt_device_type = {
 
 static int __init mdt_mod_init(void)
 {
-       struct lprocfs_static_vars lvars;
        int rc;
 
        CLASSERT(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") ==
@@ -5926,10 +5951,9 @@ static int __init mdt_mod_init(void)
        if (rc)
                GOTO(lu_fini, rc);
 
-       lprocfs_mdt_init_vars(&lvars);
        rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL,
 #ifndef HAVE_ONLY_PROCFS_SEQ
-                                lvars.module_vars,
+                                NULL,
 #endif
                                 LUSTRE_MDT_NAME, &mdt_device_type);
        if (rc)