From 4d4771b8efbecf530f3079f0681b896aede0e353 Mon Sep 17 00:00:00 2001 From: wang di Date: Wed, 2 Sep 2015 03:51:28 -0700 Subject: [PATCH] LU-7074 mdd: validate the linkea before packing During migration, let's validate linkea entry before packing the updates into the buffer and sending to the remote MDT. And also move retrieving linkea before transaction start to avoiding sending RPC inside the transaction. Signed-off-by: wang di Change-Id: I36f235274d39560f6654fd76967e45400e8187ce Reviewed-on: http://review.whamcloud.com/16235 Reviewed-by: John L. Hammond Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/mdd/mdd_dir.c | 117 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 33 deletions(-) diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 17b654d..8cb25d4 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -1045,7 +1045,11 @@ out: int error = 1; if (rc == -EOVERFLOW || rc == -ENOSPC) error = 0; - if (oldpfid == NULL) + if (newlname == NULL) + CDEBUG(error ? D_ERROR : D_OTHER, + "link_ea add failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + else if (oldpfid == NULL) CDEBUG(error ? D_ERROR : D_OTHER, "link_ea add '%.*s' failed %d "DFID"\n", newlname->ln_namelen, newlname->ln_name, @@ -3078,22 +3082,18 @@ static int mdd_update_linkea_internal(const struct lu_env *env, struct mdd_object *mdd_sobj, struct mdd_object *mdd_tobj, const struct lu_name *child_name, + struct linkea_data *ldata, struct thandle *handle, int declare) { struct mdd_thread_info *info = mdd_env_info(env); - struct linkea_data *ldata = &info->mti_link_data; int count; int rc = 0; ENTRY; - rc = mdd_links_read(env, mdd_sobj, ldata); - if (rc != 0) { - if (rc == -ENOENT || rc == -ENODATA) - rc = 0; - RETURN(rc); - } + LASSERT(ldata->ld_buf != NULL); +again: /* If it is mulitple links file, we need update the name entry for * all parent */ LASSERT(ldata->ld_leh != NULL); @@ -3106,20 +3106,21 @@ static int mdd_update_linkea_internal(const struct lu_env *env, linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname, &fid); - ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee + - ldata->ld_reclen); pobj = mdd_object_find(env, mdd, &fid); if (IS_ERR(pobj)) { CWARN("%s: cannot find obj "DFID": rc = %ld\n", mdd2obd_dev(mdd)->obd_name, PFID(&fid), PTR_ERR(pobj)); - continue; + linkea_del_buf(ldata, &lname); + goto again; } if (!mdd_object_exists(pobj)) { CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n", mdd2obd_dev(mdd)->obd_name, PFID(&fid)); - GOTO(next_put, rc); + linkea_del_buf(ldata, &lname); + mdd_object_put(env, pobj); + goto again; } if (pobj == mdd_pobj && @@ -3129,7 +3130,9 @@ static int mdd_update_linkea_internal(const struct lu_env *env, CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n", mdd2obd_dev(mdd)->obd_name, child_name->ln_name, PFID(&fid)); - GOTO(next_put, rc); + linkea_del_buf(ldata, &lname); + mdd_object_put(env, pobj); + goto again; } CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n", @@ -3159,15 +3162,45 @@ static int mdd_update_linkea_internal(const struct lu_env *env, if (rc) GOTO(next_put, rc); } else { - rc = __mdd_index_delete(env, pobj, lname.ln_name, - 0, handle); - if (rc) + char *tmp_name = info->mti_key; + + if (lname.ln_namelen >= sizeof(info->mti_key)) { + /* lnamelen is too big(> NAME_MAX + 16), + * something wrong about this linkea, let's + * skip it */ + linkea_del_buf(ldata, &lname); + mdd_object_put(env, pobj); + goto again; + } + + /* Note: lname might be without \0 at the end, see + * linkea_entry_unpack(), let's add extra \0 by + * snprintf */ + snprintf(tmp_name, sizeof(info->mti_key), "%.*s", + lname.ln_namelen, lname.ln_name); + lname.ln_name = tmp_name; + + /* Let's check if this linkEA still valid, before + * it might be packed into the RPC buffer. */ + rc = mdd_lookup(env, &pobj->mod_obj, &lname, + &info->mti_fid, NULL); + if (rc < 0 || + !lu_fid_eq(&info->mti_fid, + mdd_object_fid(mdd_sobj))) { + /* skip invalid linkea entry */ + linkea_del_buf(ldata, &lname); + mdd_object_put(env, pobj); + goto again; + } + + rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle); + if (rc != 0) GOTO(next_put, rc); rc = __mdd_index_insert(env, pobj, mdd_object_fid(mdd_tobj), mdd_object_type(mdd_tobj), - lname.ln_name, handle); + tmp_name, handle); if (rc != 0) GOTO(next_put, rc); @@ -3185,6 +3218,9 @@ next_put: mdd_object_put(env, pobj); if (rc != 0) break; + + ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee + + ldata->ld_reclen); } RETURN(rc); @@ -3696,22 +3732,24 @@ static int mdd_declare_update_linkea(const struct lu_env *env, struct mdd_object *mdd_pobj, struct mdd_object *mdd_sobj, struct mdd_object *mdd_tobj, - struct thandle *handle, - const struct lu_name *child_name) + const struct lu_name *child_name, + struct linkea_data *ldata, + struct thandle *handle) { return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj, - child_name, handle, 1); + child_name, ldata, handle, 1); } static int mdd_update_linkea(const struct lu_env *env, struct mdd_object *mdd_pobj, struct mdd_object *mdd_sobj, struct mdd_object *mdd_tobj, - struct thandle *handle, - const struct lu_name *child_name) + const struct lu_name *child_name, + struct linkea_data *ldata, + struct thandle *handle) { return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj, - child_name, handle, 0); + child_name, ldata, handle, 0); } static int mdd_declare_migrate_update_name(const struct lu_env *env, @@ -3721,6 +3759,7 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env, const struct lu_name *lname, struct lu_attr *la, struct lu_attr *parent_la, + struct linkea_data *ldata, struct thandle *handle) { struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr); @@ -3738,10 +3777,12 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env, if (rc != 0) return rc; - rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj, - mdd_tobj, handle, lname); - if (rc != 0) - return rc; + if (ldata->ld_buf != NULL) { + rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj, + mdd_tobj, lname, ldata, handle); + if (rc != 0) + return rc; + } if (S_ISREG(mdd_object_type(mdd_sobj))) { rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV, @@ -3816,6 +3857,7 @@ static int mdd_migrate_update_name(const struct lu_env *env, struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr); struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr); struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj); + struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data; struct thandle *handle; int is_dir = S_ISDIR(mdd_object_type(mdd_sobj)); const char *name = lname->ln_name; @@ -3831,12 +3873,18 @@ static int mdd_migrate_update_name(const struct lu_env *env, if (rc != 0) RETURN(rc); + ldata->ld_buf = NULL; + rc = mdd_links_read(env, mdd_sobj, ldata); + if (rc != 0 && rc != -ENOENT && rc != -ENODATA) + RETURN(rc); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, - lname, so_attr, p_la, handle); + lname, so_attr, p_la, ldata, + handle); if (rc != 0) { /* If the migration can not be fit in one transaction, just * leave it in the original MDT */ @@ -3867,10 +3915,12 @@ static int mdd_migrate_update_name(const struct lu_env *env, if (rc != 0) GOTO(stop_trans, rc); - rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj, - handle, lname); - if (rc != 0) - GOTO(stop_trans, rc); + if (ldata->ld_buf != NULL) { + rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj, + lname, ldata, handle); + if (rc != 0) + GOTO(stop_trans, rc); + } if (S_ISREG(so_attr->la_mode)) { if (so_attr->la_nlink == 1) { @@ -3893,8 +3943,9 @@ static int mdd_migrate_update_name(const struct lu_env *env, if (rc != 0) GOTO(stop_trans, rc); + linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj)); rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle, - NULL, 1); + ldata, 1); if (rc != 0) GOTO(stop_trans, rc); -- 1.8.3.1