From: Lai Siyao Date: Sat, 8 Dec 2018 22:08:14 +0000 (+0800) Subject: LU-11753 obdclass: index_page support variable length rec X-Git-Tag: 2.12.0-RC3~2 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=a13e4991a1350f54f97c6ba13686d33c7a3eeb57;ds=sidebyside LU-11753 obdclass: index_page support variable length rec mdd_dir_is_empty() may readdir from other MDT if directory is striped or remote, in this case, it will issue OBD_IDX_READ RPC to fetch dir page, and on remote MDT dt_index_page_build() is called to build page, but this function doesn't support variable length record, so it may miscalculate offset in reading, which may cause crash. Signed-off-by: Lai Siyao Signed-off-by: Alex Zhuravlev Change-Id: Ia25a0aca52fb1323ea64a7ff72bf6022754af32c Reviewed-on: https://review.whamcloud.com/33837 Tested-by: Jenkins Reviewed-by: Andreas Dilger Reviewed-by: Stephan Thiell Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 95a9070..faff31f 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -721,8 +721,12 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, struct idx_info *ii = (struct idx_info *)arg; struct lu_idxpage *lip = &lp->lp_idx; char *entry; - size_t size; + __u64 hash; + __u16 hashsize = 0; + __u16 keysize = 0; + __u16 recsize; int rc; + ENTRY; if (nob < LIP_HDR_SIZE) @@ -733,20 +737,12 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, lip->lip_magic = LIP_MAGIC; nob -= LIP_HDR_SIZE; - /* compute size needed to store a key/record pair */ - size = ii->ii_recsize + ii->ii_keysize; - if ((ii->ii_flags & II_FL_NOHASH) == 0) - /* add hash if the client wants it */ - size += sizeof(__u64); + /* client wants to the 64-bit hash value associated with each record */ + if (!(ii->ii_flags & II_FL_NOHASH)) + hashsize = sizeof(hash); entry = lip->lip_entries; do { - char *tmp_entry = entry; - struct dt_key *key; - __u64 hash; - __u16 keysize; - __u16 recsize; - /* fetch 64-bit hash value */ hash = iops->store(env, it); ii->ii_hash_end = hash; @@ -756,58 +752,54 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, GOTO(out, rc = 0); } - if (nob < size) { - if (lip->lip_nr == 0) + if (!(ii->ii_flags & II_FL_NOKEY)) { + keysize = iops->key_size(env, it); + if (!(ii->ii_flags & II_FL_VARKEY) && + keysize != ii->ii_keysize) { + CERROR("keysize mismatch %hu != %hu.\n", + keysize, ii->ii_keysize); GOTO(out, rc = -EINVAL); - GOTO(out, rc = 0); - } - - if (!(ii->ii_flags & II_FL_NOHASH)) { - /* - * client wants to the 64-bit hash value associated - * with each record - */ - memcpy(tmp_entry, &hash, sizeof(hash)); - tmp_entry += sizeof(hash); + } } - if (ii->ii_flags & II_FL_VARKEY) - keysize = iops->key_size(env, it); + /* and finally the record */ + if (ii->ii_flags & II_FL_VARREC) + recsize = iops->rec_size(env, it, attr); else - keysize = ii->ii_keysize; + recsize = ii->ii_recsize; - if (!(ii->ii_flags & II_FL_NOKEY)) { - /* then the key value */ - key = iops->key(env, it); - memcpy(tmp_entry, key, keysize); - tmp_entry += keysize; + if (nob < hashsize + keysize + recsize) { + if (lip->lip_nr == 0) + GOTO(out, rc = -E2BIG); + GOTO(out, rc = 0); } - /* and finally the record */ - rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr); - if (rc != -ESTALE) { - if (rc != 0) - GOTO(out, rc); - + rc = iops->rec(env, it, + (struct dt_rec *)(entry + hashsize + keysize), + attr); + if (!rc) { + if (hashsize) + memcpy(entry, &hash, hashsize); + if (keysize) { + struct dt_key *key; + + key = iops->key(env, it); + memcpy(entry + hashsize, key, keysize); + } /* hash/key/record successfully copied! */ lip->lip_nr++; if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0)) ii->ii_hash_start = hash; - - if (ii->ii_flags & II_FL_VARREC) - recsize = iops->rec_size(env, it, attr); - else - recsize = ii->ii_recsize; - - entry = tmp_entry + recsize; - nob -= size; + entry += hashsize + keysize + recsize; + nob -= hashsize + keysize + recsize; + } else if (rc != -ESTALE) { + GOTO(out, rc); } /* move on to the next record */ do { rc = iops->next(env, it); } while (rc == -ESTALE); - } while (rc == 0); GOTO(out, rc);