Whamcloud - gitweb
LU-7074 mdd: validate the linkea before packing 35/16235/13
authorwang di <di.wang@intel.com>
Wed, 2 Sep 2015 10:51:28 +0000 (03:51 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 7 Oct 2015 17:40:12 +0000 (17:40 +0000)
During migration, let's validate linkea entry before
packing the updates into the buffer and sending to
the remote MDT.

And also move retrieving linkea before transaction
start to avoiding sending RPC inside the transaction.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I36f235274d39560f6654fd76967e45400e8187ce
Reviewed-on: http://review.whamcloud.com/16235
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/mdd/mdd_dir.c

index 17b654d..8cb25d4 100644 (file)
@@ -1045,7 +1045,11 @@ out:
                int error = 1;
                if (rc == -EOVERFLOW || rc == -ENOSPC)
                        error = 0;
-               if (oldpfid == NULL)
+               if (newlname == NULL)
+                       CDEBUG(error ? D_ERROR : D_OTHER,
+                              "link_ea add failed %d "DFID"\n",
+                              rc, PFID(mdd_object_fid(mdd_obj)));
+               else if (oldpfid == NULL)
                        CDEBUG(error ? D_ERROR : D_OTHER,
                               "link_ea add '%.*s' failed %d "DFID"\n",
                               newlname->ln_namelen, newlname->ln_name,
@@ -3078,22 +3082,18 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
                                      struct mdd_object *mdd_sobj,
                                      struct mdd_object *mdd_tobj,
                                      const struct lu_name *child_name,
+                                     struct linkea_data *ldata,
                                      struct thandle *handle,
                                      int declare)
 {
        struct mdd_thread_info  *info = mdd_env_info(env);
-       struct linkea_data      *ldata = &info->mti_link_data;
        int                     count;
        int                     rc = 0;
        ENTRY;
 
-       rc = mdd_links_read(env, mdd_sobj, ldata);
-       if (rc != 0) {
-               if (rc == -ENOENT || rc == -ENODATA)
-                       rc = 0;
-               RETURN(rc);
-       }
+       LASSERT(ldata->ld_buf != NULL);
 
+again:
        /* If it is mulitple links file, we need update the name entry for
         * all parent */
        LASSERT(ldata->ld_leh != NULL);
@@ -3106,20 +3106,21 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
 
                linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
                                    &lname, &fid);
-               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
-                                                        ldata->ld_reclen);
                pobj = mdd_object_find(env, mdd, &fid);
                if (IS_ERR(pobj)) {
                        CWARN("%s: cannot find obj "DFID": rc = %ld\n",
                              mdd2obd_dev(mdd)->obd_name, PFID(&fid),
                              PTR_ERR(pobj));
-                       continue;
+                       linkea_del_buf(ldata, &lname);
+                       goto again;
                }
 
                if (!mdd_object_exists(pobj)) {
                        CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
                              mdd2obd_dev(mdd)->obd_name, PFID(&fid));
-                       GOTO(next_put, rc);
+                       linkea_del_buf(ldata, &lname);
+                       mdd_object_put(env, pobj);
+                       goto again;
                }
 
                if (pobj == mdd_pobj &&
@@ -3129,7 +3130,9 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
                        CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
                              mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
                              PFID(&fid));
-                       GOTO(next_put, rc);
+                       linkea_del_buf(ldata, &lname);
+                       mdd_object_put(env, pobj);
+                       goto again;
                }
 
                CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
@@ -3159,15 +3162,45 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
                        if (rc)
                                GOTO(next_put, rc);
                } else {
-                       rc = __mdd_index_delete(env, pobj, lname.ln_name,
-                                               0, handle);
-                       if (rc)
+                       char *tmp_name = info->mti_key;
+
+                       if (lname.ln_namelen >= sizeof(info->mti_key)) {
+                               /* lnamelen is too big(> NAME_MAX + 16),
+                                * something wrong about this linkea, let's
+                                * skip it */
+                               linkea_del_buf(ldata, &lname);
+                               mdd_object_put(env, pobj);
+                               goto again;
+                       }
+
+                       /* Note: lname might be without \0 at the end, see
+                        * linkea_entry_unpack(), let's add extra \0 by
+                        * snprintf */
+                       snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
+                                lname.ln_namelen, lname.ln_name);
+                       lname.ln_name = tmp_name;
+
+                       /* Let's check if this linkEA still valid, before
+                        * it might be packed into the RPC buffer. */
+                       rc = mdd_lookup(env, &pobj->mod_obj, &lname,
+                                       &info->mti_fid, NULL);
+                       if (rc < 0 ||
+                           !lu_fid_eq(&info->mti_fid,
+                                       mdd_object_fid(mdd_sobj))) {
+                               /* skip invalid linkea entry */
+                               linkea_del_buf(ldata, &lname);
+                               mdd_object_put(env, pobj);
+                               goto again;
+                       }
+
+                       rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
+                       if (rc != 0)
                                GOTO(next_put, rc);
 
                        rc = __mdd_index_insert(env, pobj,
                                        mdd_object_fid(mdd_tobj),
                                        mdd_object_type(mdd_tobj),
-                                       lname.ln_name, handle);
+                                       tmp_name, handle);
                        if (rc != 0)
                                GOTO(next_put, rc);
 
@@ -3185,6 +3218,9 @@ next_put:
                mdd_object_put(env, pobj);
                if (rc != 0)
                        break;
+
+               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+                                                        ldata->ld_reclen);
        }
 
        RETURN(rc);
@@ -3696,22 +3732,24 @@ static int mdd_declare_update_linkea(const struct lu_env *env,
                                     struct mdd_object *mdd_pobj,
                                     struct mdd_object *mdd_sobj,
                                     struct mdd_object *mdd_tobj,
-                                    struct thandle *handle,
-                                    const struct lu_name *child_name)
+                                    const struct lu_name *child_name,
+                                    struct linkea_data *ldata,
+                                    struct thandle *handle)
 {
        return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, handle, 1);
+                                         child_name, ldata, handle, 1);
 }
 
 static int mdd_update_linkea(const struct lu_env *env,
                             struct mdd_object *mdd_pobj,
                             struct mdd_object *mdd_sobj,
                             struct mdd_object *mdd_tobj,
-                            struct thandle *handle,
-                            const struct lu_name *child_name)
+                            const struct lu_name *child_name,
+                            struct linkea_data *ldata,
+                            struct thandle *handle)
 {
        return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, handle, 0);
+                                         child_name, ldata, handle, 0);
 }
 
 static int mdd_declare_migrate_update_name(const struct lu_env *env,
@@ -3721,6 +3759,7 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env,
                                           const struct lu_name *lname,
                                           struct lu_attr *la,
                                           struct lu_attr *parent_la,
+                                          struct linkea_data *ldata,
                                           struct thandle *handle)
 {
        struct lu_attr  *la_flag = MDD_ENV_VAR(env, tattr);
@@ -3738,10 +3777,12 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
-       rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
-                                      mdd_tobj, handle, lname);
-       if (rc != 0)
-               return rc;
+       if (ldata->ld_buf != NULL) {
+               rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
+                                              mdd_tobj, lname, ldata, handle);
+               if (rc != 0)
+                       return rc;
+       }
 
        if (S_ISREG(mdd_object_type(mdd_sobj))) {
                rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
@@ -3816,6 +3857,7 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
        struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
        struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+       struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
        struct thandle          *handle;
        int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
        const char              *name = lname->ln_name;
@@ -3831,12 +3873,18 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
+       ldata->ld_buf = NULL;
+       rc = mdd_links_read(env, mdd_sobj, ldata);
+       if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
+               RETURN(rc);
+
        handle = mdd_trans_create(env, mdd);
        if (IS_ERR(handle))
                RETURN(PTR_ERR(handle));
 
        rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                            lname, so_attr, p_la, handle);
+                                            lname, so_attr, p_la, ldata,
+                                            handle);
        if (rc != 0) {
                /* If the migration can not be fit in one transaction, just
                 * leave it in the original MDT */
@@ -3867,10 +3915,12 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop_trans, rc);
 
-       rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                              handle, lname);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       if (ldata->ld_buf != NULL) {
+               rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
+                                      lname, ldata, handle);
+               if (rc != 0)
+                       GOTO(stop_trans, rc);
+       }
 
        if (S_ISREG(so_attr->la_mode)) {
                if (so_attr->la_nlink == 1) {
@@ -3893,8 +3943,9 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop_trans, rc);
 
+       linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
        rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
-                          NULL, 1);
+                          ldata, 1);
        if (rc != 0)
                GOTO(stop_trans, rc);