X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=2ea902587785336af859b47a54e12e2354537ba5;hb=0699c02116e76ce11a4489a8dbd4c983052f313b;hp=ef9781d1e965f6162eb0b269350b2c6ec07c0a1a;hpb=f1171da4e06b9974caf614d6eaa9e037728bab1e;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index ef9781d..2ea9025 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -43,8 +43,8 @@ #include #include #include -#include #include +#include #include "lod_internal.h" @@ -125,10 +125,10 @@ static struct dt_it *lod_it_init(const struct lu_env *env, } #define LOD_CHECK_IT(env, it) \ -{ \ +do { \ LASSERT((it)->lit_obj != NULL); \ LASSERT((it)->lit_it != NULL); \ -} while(0) +} while (0) void lod_it_fini(const struct lu_env *env, struct dt_it *di) { @@ -189,7 +189,18 @@ int lod_it_rec(const struct lu_env *env, const struct dt_it *di, const struct lod_it *it = (const struct lod_it *)di; LOD_CHECK_IT(env, it); - return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); + return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, + attr); +} + +int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di, + __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it, + attr); } __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) @@ -209,12 +220,13 @@ int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) } int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void* key_rec) + void *key_rec) { const struct lod_it *it = (const struct lod_it *)di; LOD_CHECK_IT(env, it); - return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec); + return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, + key_rec); } static struct dt_index_operations lod_index_ops = { @@ -232,12 +244,679 @@ static struct dt_index_operations lod_index_ops = { .key = lod_it_key, .key_size = lod_it_key_size, .rec = lod_it_rec, + .rec_size = lod_it_rec_size, .store = lod_it_store, .load = lod_it_load, .key_rec = lod_it_key_rec, } }; +/** + * Implementation of dt_index_operations:: dio_it.init + * + * This function is to initialize the iterator for striped directory, + * basically these lod_striped_it_xxx will just locate the stripe + * and call the correspondent api of its next lower layer. + * + * \param[in] env execution environment. + * \param[in] dt the striped directory object to be iterated. + * \param[in] attr the attribute of iterator, mostly used to indicate + * the entry attribute in the object to be iterated. + * \param[in] capa capability(useless in current implementation) + * + * \retval initialized iterator(dt_it) if successful initialize the + * iteration. lit_stripe_index will be used to indicate the + * current iterate position among stripes. + * \retval ERR pointer if initialization is failed. + */ +static struct dt_it *lod_striped_it_init(const struct lu_env *env, + struct dt_object *dt, __u32 attr, + struct lustre_capa *capa) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next; + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + ENTRY; + + LASSERT(lo->ldo_stripenr > 0); + next = lo->ldo_stripe[0]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + if (IS_ERR(it_next)) + return it_next; + + /* currently we do not use more than one iterator per thread + * so we store it in thread info. if at some point we need + * more active iterators in a single thread, we can allocate + * additional ones */ + LASSERT(it->lit_obj == NULL); + + it->lit_stripe_index = 0; + it->lit_attr = attr; + it->lit_it = it_next; + it->lit_obj = dt; + + return (struct dt_it *)it; +} + +#define LOD_CHECK_STRIPED_IT(env, it, lo) \ +do { \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ + LASSERT((lo)->ldo_stripenr > 0); \ + LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \ +} while (0) + +/** + * Implementation of dt_index_operations:: dio_it.fini + * + * This function is to finish the iterator for striped directory. + * + * \param[in] env execution environment. + * \param[in] di the iterator for the striped directory + * + */ +static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + next->do_index_ops->dio_it.fini(env, it->lit_it); + + /* the iterator not in use any more */ + it->lit_obj = NULL; + it->lit_it = NULL; + it->lit_stripe_index = 0; +} + +/** + * Implementation of dt_index_operations:: dio_it.get + * + * This function is to position the iterator with given key + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] key the key the iterator will be positioned. + * + * \retval 0 if successfully position iterator by the key. + * \retval negative error if position is failed. + */ +static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + ENTRY; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.get(env, it->lit_it, key); +} + +/** + * Implementation of dt_index_operations:: dio_it.put + * + * This function is supposed to be the pair of it_get, but currently do + * nothing. see (osd_it_ea_put or osd_index_it_put) + */ +static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.put(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.next + * + * This function is to position the iterator to the next entry, if current + * stripe is finished by checking the return value of next() in current + * stripe. it will go to next stripe. In the mean time, the sub-iterator + * for next stripe needs to be initialized. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval 0 if successfully position iterator to the next entry. + * \retval negative error if position is failed. + */ +static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + struct dt_it *it_next; + int rc; + ENTRY; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); +again: + rc = next->do_index_ops->dio_it.next(env, it->lit_it); + if (rc < 0) + RETURN(rc); + + if (rc == 0 && it->lit_stripe_index == 0) + RETURN(rc); + + if (rc == 0 && it->lit_stripe_index > 0) { + struct lu_dirent *ent; + + ent = (struct lu_dirent *)lod_env_info(env)->lti_key; + + rc = next->do_index_ops->dio_it.rec(env, it->lit_it, + (struct dt_rec *)ent, + it->lit_attr); + if (rc != 0) + RETURN(rc); + + /* skip . and .. for slave stripe */ + if ((strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 1) || + (strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 2)) + goto again; + + RETURN(rc); + } + + /* go to next stripe */ + if (it->lit_stripe_index + 1 >= lo->ldo_stripenr) + RETURN(1); + + it->lit_stripe_index++; + + next->do_index_ops->dio_it.put(env, it->lit_it); + next->do_index_ops->dio_it.fini(env, it->lit_it); + + rc = next->do_ops->do_index_try(env, next, &dt_directory_features); + if (rc != 0) + RETURN(rc); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr, + BYPASS_CAPA); + if (!IS_ERR(it_next)) { + it->lit_it = it_next; + goto again; + } else { + rc = PTR_ERR(it_next); + } + + RETURN(rc); +} + +/** + * Implementation of dt_index_operations:: dio_it.key + * + * This function is to get the key of the iterator at current position. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval key(dt_key) if successfully get the key. + * \retval negative error if can not get the key. + */ +static struct dt_key *lod_striped_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.key(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.key_size + * + * This function is to get the key_size of current key. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval key_size if successfully get the key_size. + * \retval negative error if can not get the key_size. + */ +static int lod_striped_it_key_size(const struct lu_env *env, + const struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.key_size(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.rec + * + * This function is to get the record at current position. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] attr the attribute of iterator, mostly used to indicate + * the entry attribute in the object to be iterated. + * \param[out] rec hold the return record. + * + * \retval 0 if successfully get the entry. + * \retval negative error if can not get entry. + */ +static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); +} + +/** + * Implementation of dt_index_operations:: dio_it.rec_size + * + * This function is to get the record_size at current record. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] attr the attribute of iterator, mostly used to indicate + * the entry attribute in the object to be iterated. + * + * \retval rec_size if successfully get the entry size. + * \retval negative error if can not get entry size. + */ +static int lod_striped_it_rec_size(const struct lu_env *env, + const struct dt_it *di, __u32 attr) +{ + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr); +} + +/** + * Implementation of dt_index_operations:: dio_it.store + * + * This function will a cookie for current position of the iterator head, + * so that user can use this cookie to load/start the iterator next time. + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * + * \retval the cookie. + */ +static __u64 lod_striped_it_store(const struct lu_env *env, + const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.store(env, it->lit_it); +} + +/** + * Implementation of dt_index_operations:: dio_it.load + * + * This function will position the iterator with the given hash(usually + * get from store), + * + * \param[in] env execution environment. + * \param[in] di the iterator for striped directory. + * \param[in] hash the given hash. + * + * \retval >0 if successfuly load the iterator to the given position. + * \retval <0 if load is failed. + */ +static int lod_striped_it_load(const struct lu_env *env, + const struct dt_it *di, __u64 hash) +{ + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + LOD_CHECK_STRIPED_IT(env, it, lo); + + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); + + return next->do_index_ops->dio_it.load(env, it->lit_it, hash); +} + +static struct dt_index_operations lod_striped_index_ops = { + .dio_lookup = lod_index_lookup, + .dio_declare_insert = lod_declare_index_insert, + .dio_insert = lod_index_insert, + .dio_declare_delete = lod_declare_index_delete, + .dio_delete = lod_index_delete, + .dio_it = { + .init = lod_striped_it_init, + .fini = lod_striped_it_fini, + .get = lod_striped_it_get, + .put = lod_striped_it_put, + .next = lod_striped_it_next, + .key = lod_striped_it_key, + .key_size = lod_striped_it_key_size, + .rec = lod_striped_it_rec, + .rec_size = lod_striped_it_rec_size, + .store = lod_striped_it_store, + .load = lod_striped_it_load, + } +}; + +/** + * Append the FID for each shard of the striped directory after the + * given LMV EA header. + * + * To simplify striped directory and the consistency verification, + * we only store the LMV EA header on disk, for both master object + * and slave objects. When someone wants to know the whole LMV EA, + * such as client readdir(), we can build the entrie LMV EA on the + * MDT side (in RAM) via iterating the sub-directory entries that + * are contained in the master object of the stripe directory. + * + * For the master object of the striped directroy, the valid name + * for each shard is composed of the ${shard_FID}:${shard_idx}. + * + * There may be holes in the LMV EA if some shards' name entries + * are corrupted or lost. + * + * \param[in] env pointer to the thread context + * \param[in] lo pointer to the master object of the striped directory + * \param[in] buf pointer to the lu_buf which will hold the LMV EA + * \param[in] resize whether re-allocate the buffer if it is not big enough + * + * \retval positive size of the LMV EA + * \retval 0 for nothing to be loaded + * \retval negative error number on failure + */ +int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, + struct lu_buf *buf, bool resize) +{ + struct lu_dirent *ent = + (struct lu_dirent *)lod_env_info(env)->lti_key; + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct dt_object *obj = dt_object_child(&lo->ldo_obj); + struct lmv_mds_md_v1 *lmv1 = buf->lb_buf; + struct dt_it *it; + const struct dt_it_ops *iops; + __u32 stripes; + __u32 magic = le32_to_cpu(lmv1->lmv_magic); + int size; + int rc; + ENTRY; + + /* If it is not a striped directory, then load nothing. */ + if (magic != LMV_MAGIC_V1) + RETURN(0); + + /* If it is in migration (or failure), then load nothing. */ + if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) + RETURN(0); + + stripes = le32_to_cpu(lmv1->lmv_stripe_count); + if (stripes < 1) + RETURN(0); + + size = lmv_mds_md_size(stripes, magic); + if (buf->lb_len < size) { + struct lu_buf tbuf; + + if (!resize) + RETURN(-ERANGE); + + tbuf = *buf; + buf->lb_buf = NULL; + buf->lb_len = 0; + lu_buf_alloc(buf, size); + lmv1 = buf->lb_buf; + if (lmv1 == NULL) + RETURN(-ENOMEM); + + memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len); + } + + if (unlikely(!dt_try_as_dir(env, obj))) + RETURN(-ENOTDIR); + + memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid)); + iops = &obj->do_index_ops->dio_it; + it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); + if (IS_ERR(it)) + RETURN(PTR_ERR(it)); + + rc = iops->load(env, it, 0); + if (rc == 0) + rc = iops->next(env, it); + else if (rc > 0) + rc = 0; + + while (rc == 0) { + char name[FID_LEN + 2] = ""; + struct lu_fid fid; + __u32 index; + int len; + + rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH); + if (rc != 0) + break; + + rc = -EIO; + + fid_le_to_cpu(&fid, &ent->lde_fid); + ent->lde_namelen = le16_to_cpu(ent->lde_namelen); + if (ent->lde_name[0] == '.') { + if (ent->lde_namelen == 1) + goto next; + + if (ent->lde_namelen == 2 && ent->lde_name[1] == '.') + goto next; + } + + len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid)); + /* The ent->lde_name is composed of ${FID}:${index} */ + if (ent->lde_namelen < len + 1 || + memcmp(ent->lde_name, name, len) != 0) { + CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the FID "DFID + " for the striped directory "DFID", %s\n", + lod2obd(lod)->obd_name, ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? "failout" : "skip"); + + if (lod->lod_lmv_failout) + break; + + goto next; + } + + index = 0; + do { + if (ent->lde_name[len] < '0' || + ent->lde_name[len] > '9') { + CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the " + "FID "DFID" for the striped directory " + DFID", %s\n", + lod2obd(lod)->obd_name, ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? + "failout" : "skip"); + + if (lod->lod_lmv_failout) + break; + + goto next; + } + + index = index * 10 + ent->lde_name[len++] - '0'; + } while (len < ent->lde_namelen); + + if (len == ent->lde_namelen) { + /* Out of LMV EA range. */ + if (index >= stripes) { + CERROR("%s: the shard %.*s for the striped " + "directory "DFID" is out of the known " + "LMV EA range [0 - %u], failout\n", + lod2obd(lod)->obd_name, ent->lde_namelen, + ent->lde_name, + PFID(lu_object_fid(&obj->do_lu)), + stripes - 1); + + break; + } + + /* The slot has been occupied. */ + if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) { + struct lu_fid fid0; + + fid_le_to_cpu(&fid0, + &lmv1->lmv_stripe_fids[index]); + CERROR("%s: both the shard "DFID" and "DFID + " for the striped directory "DFID + " claim the same LMV EA slot at the " + "index %d, failout\n", + lod2obd(lod)->obd_name, + PFID(&fid0), PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), index); + + break; + } + + /* stored as LE mode */ + lmv1->lmv_stripe_fids[index] = ent->lde_fid; + +next: + rc = iops->next(env, it); + } + } + + iops->put(env, it); + iops->fini(env, it); + + RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc); +} + +/** + * Implementation of dt_object_operations:: do_index_try + * + * This function will try to initialize the index api pointer for the + * given object, usually it the entry point of the index api. i.e. + * the index object should be initialized in index_try, then start + * using index api. For striped directory, it will try to initialize + * all of its sub_stripes. + * + * \param[in] env execution environment. + * \param[in] dt the index object to be initialized. + * \param[in] feat the features of this object, for example fixed or + * variable key size etc. + * + * \retval >0 if the initialization is successful. + * \retval <0 if the initialization is failed. + */ +static int lod_index_try(const struct lu_env *env, struct dt_object *dt, + const struct dt_index_features *feat) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; + + LASSERT(next->do_ops); + LASSERT(next->do_ops->do_index_try); + + rc = lod_load_striping_locked(env, lo); + if (rc != 0) + RETURN(rc); + + rc = next->do_ops->do_index_try(env, next, feat); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr > 0) { + int i; + + for (i = 0; i < lo->ldo_stripenr; i++) { + if (dt_object_exists(lo->ldo_stripe[i]) == 0) + continue; + rc = lo->ldo_stripe[i]->do_ops->do_index_try(env, + lo->ldo_stripe[i], feat); + if (rc != 0) + RETURN(rc); + } + dt->do_index_ops = &lod_striped_index_ops; + } else { + dt->do_index_ops = &lod_index_ops; + } + + RETURN(rc); +} + static void lod_object_read_lock(const struct lu_env *env, struct dt_object *dt, unsigned role) { @@ -273,9 +952,69 @@ static int lod_attr_get(const struct lu_env *env, struct lu_attr *attr, struct lustre_capa *capa) { + /* Note: for striped directory, client will merge attributes + * from all of the sub-stripes see lmv_merge_attr(), and there + * no MDD logic depend on directory nlink/size/time, so we can + * always use master inode nlink and size for now. */ return dt_attr_get(env, dt_object_child(dt), attr, capa); } +/** + * Mark all of sub-stripes dead of the striped directory. + **/ +static int lod_mark_dead_object(const struct lu_env *env, + struct dt_object *dt, + struct thandle *handle, + bool declare) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct lmv_mds_md_v1 *lmv; + __u32 dead_hash_type; + int rc; + int i; + + ENTRY; + + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(0); + + rc = lod_load_striping_locked(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(0); + + rc = lod_get_lmv_ea(env, lo); + if (rc <= 0) + RETURN(rc); + + lmv = lod_env_info(env)->lti_ea_store; + lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); + dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD; + lmv->lmv_hash_type = cpu_to_le32(dead_hash_type); + for (i = 0; i < lo->ldo_stripenr; i++) { + struct lu_buf buf; + + lmv->lmv_master_mdt_index = i; + buf.lb_buf = lmv; + buf.lb_len = sizeof(*lmv); + if (declare) { + rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, handle); + } else { + rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf, + XATTR_NAME_LMV, LU_XATTR_REPLACE, + handle, BYPASS_CAPA); + } + if (rc != 0) + break; + } + + RETURN(rc); +} + static int lod_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, @@ -286,6 +1025,13 @@ static int lod_declare_attr_set(const struct lu_env *env, int rc, i; ENTRY; + /* Set dead object on all other stripes */ + if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && + attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { + rc = lod_mark_dead_object(env, dt, handle, true); + RETURN(rc); + } + /* * declare setattr on the local object */ @@ -323,39 +1069,18 @@ static int lod_declare_attr_set(const struct lu_env *env, if (lo->ldo_stripenr == 0) RETURN(0); - if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) { - struct lu_attr *la = &lod_env_info(env)->lti_attr; - bool setattr_time = false; - - rc = dt_attr_get(env, dt_object_child(dt), la, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - - /* If it will only setattr time, it will only set - * time < current_time */ - if ((attr->la_valid & LA_ATIME && - attr->la_atime < la->la_atime) || - (attr->la_valid & LA_CTIME && - attr->la_ctime < la->la_ctime) || - (attr->la_valid & LA_MTIME && - attr->la_mtime < la->la_mtime)) - setattr_time = true; - - if (!setattr_time) - RETURN(0); - } /* * if object is striped declare changes on the stripes */ LASSERT(lo->ldo_stripe); for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - - rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle); - if (rc) { - CERROR("failed declaration: %d\n", rc); - break; + if (likely(lo->ldo_stripe[i] != NULL)) { + rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, + handle); + if (rc != 0) { + CERROR("failed declaration: %d\n", rc); + break; + } } } @@ -385,11 +1110,18 @@ static int lod_attr_set(const struct lu_env *env, struct thandle *handle, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc, i; + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc, i; ENTRY; + /* Set dead object on all other stripes */ + if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && + attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { + rc = lod_mark_dead_object(env, dt, handle, false); + RETURN(rc); + } + /* * apply changes to the local object */ @@ -412,37 +1144,19 @@ static int lod_attr_set(const struct lu_env *env, if (lo->ldo_stripenr == 0) RETURN(0); - if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) { - struct lu_attr *la = &lod_env_info(env)->lti_attr; - bool setattr_time = false; - - rc = dt_attr_get(env, dt_object_child(dt), la, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - - /* If it will only setattr time, it will only set - * time < current_time */ - if ((attr->la_valid & LA_ATIME && - attr->la_atime < la->la_atime) || - (attr->la_valid & LA_CTIME && - attr->la_ctime < la->la_ctime) || - (attr->la_valid & LA_MTIME && - attr->la_mtime < la->la_mtime)) - setattr_time = true; - - if (!setattr_time) - RETURN(0); - } - /* * if object is striped, apply changes to all the stripes */ LASSERT(lo->ldo_stripe); for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); + if (unlikely(lo->ldo_stripe[i] == NULL)) + continue; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + (dt_object_exists(lo->ldo_stripe[i]) == 0)) + continue; + rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa); - if (rc) { + if (rc != 0) { CERROR("failed declaration: %d\n", rc); break; } @@ -499,6 +1213,42 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, ENTRY; rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa); + if (strcmp(name, XATTR_NAME_LMV) == 0) { + struct lmv_mds_md_v1 *lmv1; + int rc1 = 0; + + if (rc > sizeof(*lmv1)) + RETURN(rc); + + if (rc < sizeof(*lmv1)) + RETURN(rc = rc > 0 ? -EINVAL : rc); + + if (buf->lb_buf == NULL || buf->lb_len == 0) { + CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key)); + + info->lti_buf.lb_buf = info->lti_key; + info->lti_buf.lb_len = sizeof(*lmv1); + rc = dt_xattr_get(env, dt_object_child(dt), + &info->lti_buf, name, capa); + if (unlikely(rc != sizeof(*lmv1))) + RETURN(rc = rc > 0 ? -EINVAL : rc); + + lmv1 = info->lti_buf.lb_buf; + /* The on-disk LMV EA only contains header, but the + * returned LMV EA size should contain the space for + * the FIDs of all shards of the striped directory. */ + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1) + rc = lmv_mds_md_size( + le32_to_cpu(lmv1->lmv_stripe_count), + LMV_MAGIC_V1); + } else { + rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt), + buf, false); + } + + RETURN(rc = rc1 != 0 ? rc1 : rc); + } + if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT)) RETURN(rc); @@ -549,10 +1299,6 @@ static int lod_verify_md_striping(struct lod_device *lod, if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0)) GOTO(out, rc = -EINVAL); - - if (unlikely(le32_to_cpu(lum->lum_stripe_count) > - lod->lod_remote_mdt_count + 1)) - GOTO(out, rc = -EINVAL); out: if (rc != 0) CERROR("%s: invalid lmv_user_md: magic = %x, " @@ -563,6 +1309,18 @@ out: return rc; } +/** + * Master LMVEA will be same as slave LMVEA, except + * 1. different magic + * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index. + */ +static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv, + const struct lmv_mds_md_v1 *master_lmv) +{ + *slave_lmv = *master_lmv; + slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); +} + int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, struct lu_buf *lmv_buf) { @@ -571,21 +1329,21 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, struct lod_object *lo = lod_dt_obj(dt); struct lmv_mds_md_v1 *lmm1; int stripe_count; - int lmm_size; int type = LU_SEQ_RANGE_ANY; - int i; int rc; __u32 mdtidx; ENTRY; LASSERT(lo->ldo_dir_striped != 0); LASSERT(lo->ldo_stripenr > 0); - stripe_count = lo->ldo_stripenr + 1; - lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC); - if (info->lti_ea_store_size < lmm_size) { - rc = lod_ea_store_resize(info, lmm_size); + stripe_count = lo->ldo_stripenr; + /* Only store the LMV EA heahder on the disk. */ + if (info->lti_ea_store_size < sizeof(*lmm1)) { + rc = lod_ea_store_resize(info, sizeof(*lmm1)); if (rc != 0) RETURN(rc); + } else { + memset(info->lti_ea_store, 0, sizeof(*lmm1)); } lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store; @@ -598,18 +1356,8 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, RETURN(rc); lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx); - fid_cpu_to_le(&lmm1->lmv_stripe_fids[0], lu_object_fid(&dt->do_lu)); - for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; - - dto = lo->ldo_stripe[i]; - LASSERT(dto != NULL); - fid_cpu_to_le(&lmm1->lmv_stripe_fids[i + 1], - lu_object_fid(&dto->do_lu)); - } - lmv_buf->lb_buf = info->lti_ea_store; - lmv_buf->lb_len = lmm_size; + lmv_buf->lb_len = sizeof(*lmm1); lo->ldo_dir_striping_cached = 1; RETURN(rc); @@ -629,53 +1377,63 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, int rc = 0; ENTRY; - if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) - RETURN(-EINVAL); - - if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1) + if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) RETURN(0); - fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[0]); - /* Do not load striping information for slave inode */ - if (!lu_fid_eq(fid, lu_object_fid(&lo->ldo_obj.do_lu))) { + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) { lo->ldo_dir_slave_stripe = 1; RETURN(0); } + if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) + RETURN(-EINVAL); + + if (le32_to_cpu(lmv1->lmv_stripe_count) < 1) + RETURN(0); + LASSERT(lo->ldo_stripe == NULL); OBD_ALLOC(stripe, sizeof(stripe[0]) * - (le32_to_cpu(lmv1->lmv_stripe_count) - 1)); + (le32_to_cpu(lmv1->lmv_stripe_count))); if (stripe == NULL) RETURN(-ENOMEM); - /* skip master stripe */ - for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) { - struct lod_tgt_desc *tgt; - int idx; - int type = LU_SEQ_RANGE_ANY; + for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) { + struct dt_device *tgt_dt; struct dt_object *dto; + int type = LU_SEQ_RANGE_ANY; + __u32 idx; fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]); + if (!fid_is_sane(fid)) + GOTO(out, rc = -ESTALE); + rc = lod_fld_lookup(env, lod, fid, &idx, &type); if (rc != 0) GOTO(out, rc); - tgt = LTD_TGT(ltd, idx); - if (tgt == NULL) - GOTO(out, rc = -ESTALE); + if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { + tgt_dt = lod->lod_child; + } else { + struct lod_tgt_desc *tgt; + + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + GOTO(out, rc = -ESTALE); + tgt_dt = tgt->ltd_tgt; + } - dto = dt_locate_at(env, tgt->ltd_tgt, fid, + dto = dt_locate_at(env, tgt_dt, fid, lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, NULL); if (IS_ERR(dto)) GOTO(out, rc = PTR_ERR(dto)); - stripe[i - 1] = dto; + stripe[i] = dto; } out: lo->ldo_stripe = stripe; - lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count) - 1; - lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count) - 1; + lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); if (rc != 0) lod_object_free_striping(env, lo); @@ -686,13 +1444,19 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, const struct lmv_user_md_v1 *lum, + struct dt_object_format *dof, struct thandle *th) { struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); struct dt_object **stripe; struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; int stripe_count; int *idx_array; int rc = 0; @@ -704,9 +1468,13 @@ static int lod_prep_md_striped_create(const struct lu_env *env, LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0); - /* Do not need allocated master stripe */ stripe_count = le32_to_cpu(lum->lum_stripe_count); - OBD_ALLOC(stripe, sizeof(stripe[0]) * (stripe_count - 1)); + + /* shrink the stripe_count to the avaible MDT count */ + if (stripe_count > lod->lod_remote_mdt_count + 1) + stripe_count = lod->lod_remote_mdt_count + 1; + + OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); if (stripe == NULL) RETURN(-ENOMEM); @@ -714,13 +1482,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (idx_array == NULL) GOTO(out_free, rc = -ENOMEM); - idx_array[0] = le32_to_cpu(lum->lum_stripe_offset); - for (i = 1; i < stripe_count; i++) { - struct lod_tgt_desc *tgt; + for (i = 0; i < stripe_count; i++) { + struct lod_tgt_desc *tgt = NULL; struct dt_object *dto; - struct lu_fid fid; + struct lu_fid fid = { 0 }; int idx; struct lu_object_conf conf = { 0 }; + struct dt_device *tgt_dt = NULL; + + if (i == 0) { + /* Right now, master stripe and master object are + * on the same MDT */ + idx = le32_to_cpu(lum->lum_stripe_offset); + rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, + NULL); + if (rc < 0) + GOTO(out_put, rc); + tgt_dt = lod->lod_child; + goto next; + } idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1); @@ -733,7 +1513,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, " allocated %d, last allocated %d\n", idx, lod->lod_remote_mdt_count, i, idx_array[i - 1]); - /* Find next avaible target */ + /* Find next available target */ if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) continue; @@ -749,6 +1529,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (already_allocated) continue; + /* check the status of the OSP */ + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + continue; + + tgt_dt = tgt->ltd_tgt; + rc = dt_statfs(env, tgt_dt, NULL); + if (rc) { + /* this OSP doesn't feel well */ + rc = 0; + continue; + } + + rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL); + if (rc < 0) { + rc = 0; + continue; + } + break; } @@ -763,27 +1562,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, " allocated %d, last allocated %d\n", idx, lod->lod_remote_mdt_count, i, idx_array[i - 1]); - tgt = LTD_TGT(ltd, idx); - LASSERT(tgt != NULL); - - rc = obd_fid_alloc(tgt->ltd_exp, &fid, NULL); - if (rc < 0) - GOTO(out_put, rc); - rc = 0; - +next: + /* tgt_dt and fid must be ready after search avaible OSP + * in the above loop */ + LASSERT(tgt_dt != NULL); + LASSERT(fid_is_sane(&fid)); conf.loc_flags = LOC_F_NEW; - dto = dt_locate_at(env, tgt->ltd_tgt, &fid, - dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf); + dto = dt_locate_at(env, tgt_dt, &fid, + dt->do_lu.lo_dev->ld_site->ls_top_dev, + &conf); if (IS_ERR(dto)) GOTO(out_put, rc = PTR_ERR(dto)); - stripe[i - 1] = dto; + stripe[i] = dto; idx_array[i] = idx; } lo->ldo_dir_striped = 1; lo->ldo_stripe = stripe; - lo->ldo_stripenr = i - 1; - lo->ldo_stripes_allocated = stripe_count - 1; + lo->ldo_stripenr = i; + lo->ldo_stripes_allocated = stripe_count; if (lo->ldo_stripenr == 0) GOTO(out_put, rc = -ENOSPC); @@ -791,29 +1588,44 @@ static int lod_prep_md_striped_create(const struct lu_env *env, rc = lod_prep_lmv_md(env, dt, &lmv_buf); if (rc != 0) GOTO(out_put, rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + GOTO(out_put, rc = -ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); + if (!dt_try_as_dir(env, dt_object_child(dt))) + GOTO(out_put, rc = -EINVAL); + + rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; + struct dt_object *dto = stripe[i]; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { 0 }; + struct lu_buf linkea_buf; - dto = stripe[i]; - /* only create slave striped object */ - rc = dt_declare_create(env, dto, attr, NULL, NULL, th); + rc = dt_declare_create(env, dto, attr, NULL, dof, th); if (rc != 0) GOTO(out_put, rc); if (!dt_try_as_dir(env, dto)) GOTO(out_put, rc = -EINVAL); - rc = dt_declare_insert(env, dto, - (const struct dt_rec *)lu_object_fid(&dto->do_lu), - (const struct dt_key *)dot, th); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); if (rc != 0) GOTO(out_put, rc); /* master stripe FID will be put to .. */ - rc = dt_declare_insert(env, dto, - (const struct dt_rec *)lu_object_fid(&dt->do_lu), - (const struct dt_key *)dotdot, th); + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(out_put, rc); @@ -822,7 +1634,6 @@ static int lod_prep_md_striped_create(const struct lu_env *env, !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, lo->ldo_def_stripenr, lo->ldo_def_stripe_offset)) { - struct lod_thread_info *info; struct lov_user_md_v3 *v3; /* sigh, lti_ea_store has been used for lmv_buf, @@ -835,16 +1646,15 @@ static int lod_prep_md_striped_create(const struct lu_env *env, memset(v3, 0, sizeof(*v3)); v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); v3->lmm_stripe_count = - cpu_to_le32(lo->ldo_def_stripenr); + cpu_to_le16(lo->ldo_def_stripenr); v3->lmm_stripe_offset = - cpu_to_le32(lo->ldo_def_stripe_offset); + cpu_to_le16(lo->ldo_def_stripe_offset); v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool) - strncpy(v3->lmm_pool_name, lo->ldo_pool, - LOV_MAXPOOLNAME); + if (lo->ldo_pool != NULL) + strlcpy(v3->lmm_pool_name, lo->ldo_pool, + sizeof(v3->lmm_pool_name)); - info = lod_env_info(env); info->lti_buf.lb_buf = v3; info->lti_buf.lb_len = sizeof(*v3); rc = dt_declare_xattr_set(env, dto, @@ -855,27 +1665,65 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (rc != 0) GOTO(out_put, rc); } - rc = dt_declare_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, 0, - th); + + slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); + rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf, + XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out_put, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_data_new(&ldata, &info->lti_linkea_buf); + if (rc != 0) + GOTO(out_put, rc); + + rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out_put, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = dt_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out_put, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = dt_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + GOTO(out_put, rc); + + rc = dt_declare_ref_add(env, dt_object_child(dt), th); if (rc != 0) GOTO(out_put, rc); } - rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th); + rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf, + XATTR_NAME_LMV, 0, th); if (rc != 0) GOTO(out_put, rc); out_put: if (rc < 0) { - for (i = 0; i < stripe_count - 1; i++) + for (i = 0; i < stripe_count; i++) if (stripe[i] != NULL) lu_object_put(env, &stripe[i]->do_lu); - OBD_FREE(stripe, sizeof(stripe[0]) * (stripe_count - 1)); + OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); + lo->ldo_stripenr = 0; + lo->ldo_stripes_allocated = 0; + lo->ldo_stripe = NULL; } out_free: if (idx_array != NULL) OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); RETURN(rc); } @@ -887,6 +1735,7 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, const struct lu_buf *lum_buf, + struct dt_object_format *dof, struct thandle *th) { struct lod_object *lo = lod_dt_obj(dt); @@ -902,7 +1751,7 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), (int)le32_to_cpu(lum->lum_stripe_offset)); - if (le32_to_cpu(lum->lum_stripe_count) <= 1) + if (le32_to_cpu(lum->lum_stripe_count) == 0) GOTO(out, rc = 0); rc = lod_verify_md_striping(lod, lum); @@ -910,7 +1759,7 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, GOTO(out, rc); /* prepare dir striped objects */ - rc = lod_prep_md_striped_create(env, dt, attr, lum, th); + rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); if (rc != 0) { /* failed to create striping, let's reset * config so that others don't get confused */ @@ -921,6 +1770,56 @@ out: RETURN(rc); } +static int lod_dir_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; + + if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + struct lmv_user_md_v1 *lum; + + LASSERT(buf != NULL && buf->lb_buf != NULL); + lum = buf->lb_buf; + rc = lod_verify_md_striping(d, lum); + if (rc != 0) + RETURN(rc); + } + + rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + if (rc != 0) + RETURN(rc); + + /* set xattr to each stripes, if needed */ + rc = lod_load_striping(env, lo); + if (rc != 0) + RETURN(rc); + + /* Note: Do not set LinkEA on sub-stripes, otherwise + * it will confuse the fid2path process(see mdt_path_current()). + * The linkEA between master and sub-stripes is set in + * lod_xattr_set_lmv(). */ + if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf, + name, fl, th); + if (rc != 0) + break; + } + + RETURN(rc); +} + /* * LOV xattr is a storage for striping, and LOD owns this xattr. * but LOD allows others to control striping to some extent @@ -966,39 +1865,7 @@ static int lod_declare_xattr_set(const struct lu_env *env, } rc = lod_declare_striped_object(env, dt, attr, buf, th); } else if (S_ISDIR(mode)) { - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *lo = lod_dt_obj(dt); - int i; - - if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - struct lmv_user_md_v1 *lum; - - LASSERT(buf != NULL && buf->lb_buf != NULL); - lum = buf->lb_buf; - rc = lod_verify_md_striping(d, lum); - if (rc != 0) - RETURN(rc); - } - - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); - if (rc != 0) - RETURN(rc); - - /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); - if (rc != 0) - RETURN(rc); - - if (lo->ldo_stripenr == 0) - RETURN(rc); - - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf, - name, fl, th); - if (rc != 0) - break; - } + rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else { rc = dt_declare_xattr_set(env, next, buf, name, fl, th); } @@ -1013,6 +1880,71 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo) lod_object_set_pool(lo, NULL); lo->ldo_def_stripe_size = 0; lo->ldo_def_stripenr = 0; + if (lo->ldo_dir_stripe != NULL) + lo->ldo_dir_striping_cached = 0; +} + +static int lod_xattr_set_internal(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, struct thandle *th, + struct lustre_capa *capa) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + /* Note: Do not set LinkEA on sub-stripes, otherwise + * it will confuse the fid2path process(see mdt_path_current()). + * The linkEA between master and sub-stripes is set in + * lod_xattr_set_lmv(). */ + if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th, + capa); + if (rc != 0) + break; + } + + RETURN(rc); +} + +static int lod_xattr_del_internal(const struct lu_env *env, + struct dt_object *dt, + const char *name, struct thandle *th, + struct lustre_capa *capa) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = dt_xattr_del(env, next, name, th, capa); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(rc); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th, + capa); + if (rc != 0) + break; + } + + RETURN(rc); } static int lod_xattr_set_lov_on_dir(const struct lu_env *env, @@ -1023,7 +1955,6 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, struct lustre_capa *capa) { struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct dt_object *next = dt_object_child(dt); struct lod_object *l = lod_dt_obj(dt); struct lov_user_md_v1 *lum; struct lov_user_md_v3 *v3 = NULL; @@ -1038,7 +1969,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, LASSERT(buf != NULL && buf->lb_buf != NULL); lum = buf->lb_buf; - rc = lod_verify_striping(d, buf, 0); + rc = lod_verify_striping(d, buf, false); if (rc) RETURN(rc); @@ -1059,11 +1990,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, (lum->lmm_stripe_count), (lum->lmm_stripe_offset)) && lum->lmm_magic == LOV_USER_MAGIC_V1) { - rc = dt_xattr_del(env, next, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th, capa); if (rc == -ENODATA) rc = 0; } else { - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); } RETURN(rc); @@ -1076,7 +2007,6 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, struct thandle *th, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); struct lod_object *l = lod_dt_obj(dt); struct lmv_user_md_v1 *lum; int rc; @@ -1092,27 +2022,26 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), le32_to_cpu(lum->lum_stripe_offset)) && le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { - rc = dt_xattr_del(env, next, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th, capa); if (rc == -ENODATA) rc = 0; } else { - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); if (rc != 0) RETURN(rc); + } - /* Update default stripe cache */ - if (l->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(l->ldo_dir_stripe); - if (l->ldo_dir_stripe == NULL) - RETURN(-ENOMEM); - } - - l->ldo_dir_striping_cached = 0; - l->ldo_dir_def_striping_set = 1; - l->ldo_dir_def_stripenr = - le32_to_cpu(lum->lum_stripe_count) - 1; + /* Update default stripe cache */ + if (l->ldo_dir_stripe == NULL) { + OBD_ALLOC_PTR(l->ldo_dir_stripe); + if (l->ldo_dir_stripe == NULL) + RETURN(-ENOMEM); } + l->ldo_dir_striping_cached = 0; + l->ldo_dir_def_striping_set = 1; + l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count); + RETURN(rc); } @@ -1122,7 +2051,14 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, struct lustre_capa *capa) { struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *attr = &info->lti_attr; + struct dt_object_format *dof = &info->lti_format; struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; int i; int rc; ENTRY; @@ -1135,31 +2071,50 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (lo->ldo_stripenr == 0) RETURN(0); + rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + + attr->la_valid = LA_TYPE | LA_MODE; + dof->dof_type = DFT_DIR; + rc = lod_prep_lmv_md(env, dt, &lmv_buf); if (rc != 0) RETURN(rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + RETURN(-ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); + rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; - struct lu_attr *attr = &lod_env_info(env)->lti_attr; + struct dt_object *dto; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { 0 }; + struct lu_buf linkea_buf; dto = lo->ldo_stripe[i]; - memset(attr, 0, sizeof(*attr)); - attr->la_valid = LA_TYPE | LA_MODE; - attr->la_mode = S_IFDIR; - rc = dt_create(env, dto, attr, NULL, NULL, th); + dt_write_lock(env, dto, MOR_TGT_CHILD); + rc = dt_create(env, dto, attr, NULL, dof, th); + dt_write_unlock(env, dto); if (rc != 0) RETURN(rc); - rc = dt_insert(env, dto, - (const struct dt_rec *)lu_object_fid(&dto->do_lu), - (const struct dt_key *)dot, th, capa, 0); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = dt_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, capa, 0); if (rc != 0) RETURN(rc); - rc = dt_insert(env, dto, - (struct dt_rec *)lu_object_fid(&dt->do_lu), - (const struct dt_key *)dotdot, th, capa, 0); + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = dt_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, capa, 0); if (rc != 0) RETURN(rc); @@ -1167,7 +2122,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, lo->ldo_def_stripenr, lo->ldo_def_stripe_offset)) { - struct lod_thread_info *info; struct lov_user_md_v3 *v3; /* sigh, lti_ea_store has been used for lmv_buf, @@ -1175,55 +2129,250 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, * stripe EA */ OBD_ALLOC_PTR(v3); if (v3 == NULL) - RETURN(-ENOMEM); + GOTO(out, rc); memset(v3, 0, sizeof(*v3)); v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); v3->lmm_stripe_count = - cpu_to_le32(lo->ldo_def_stripenr); + cpu_to_le16(lo->ldo_def_stripenr); v3->lmm_stripe_offset = - cpu_to_le32(lo->ldo_def_stripe_offset); + cpu_to_le16(lo->ldo_def_stripe_offset); v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool) - strncpy(v3->lmm_pool_name, lo->ldo_pool, - LOV_MAXPOOLNAME); + if (lo->ldo_pool != NULL) + strlcpy(v3->lmm_pool_name, lo->ldo_pool, + sizeof(v3->lmm_pool_name)); - info = lod_env_info(env); info->lti_buf.lb_buf = v3; info->lti_buf.lb_len = sizeof(*v3); rc = dt_xattr_set(env, dto, &info->lti_buf, XATTR_NAME_LOV, 0, th, capa); OBD_FREE_PTR(v3); if (rc != 0) - RETURN(rc); + GOTO(out, rc); } - rc = dt_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, fl, th, - capa); + slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); + rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV, + fl, th, capa); + if (rc != 0) + GOTO(out, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_data_new(&ldata, &info->lti_linkea_buf); + if (rc != 0) + GOTO(out, rc); + + rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK, + 0, th, BYPASS_CAPA); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = dt_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, th, capa, 0); + if (rc != 0) + GOTO(out, rc); + + rc = dt_ref_add(env, dt_object_child(dt), th); + if (rc != 0) + GOTO(out, rc); } - rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa); + rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV, + fl, th, capa); + +out: + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); RETURN(rc); } +int lod_dir_striping_create_internal(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th, + bool declare) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr, + lo->ldo_dir_stripe_offset)) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + int stripe_count = lo->ldo_stripenr; + + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) + RETURN(rc); + v1 = info->lti_ea_store; + } + + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = cpu_to_le32(stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lo->ldo_dir_stripe_offset); + + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + + if (declare) + rc = lod_declare_xattr_set_lmv(env, dt, attr, + &info->lti_buf, dof, th); + else + rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, + XATTR_NAME_LMV, 0, th, + BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + } + + /* Transfer default LMV striping from the parent */ + if (lo->ldo_dir_striping_cached && + !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, + lo->ldo_dir_def_stripe_offset)) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + int def_stripe_count = lo->ldo_dir_def_stripenr; + + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) + RETURN(rc); + v1 = info->lti_ea_store; + } + + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = cpu_to_le32(def_stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lo->ldo_dir_def_stripe_offset); + v1->lum_hash_type = + cpu_to_le32(lo->ldo_dir_def_hash_type); + + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + if (declare) + rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, + XATTR_NAME_DEFAULT_LMV, + 0, th); + else + rc = lod_xattr_set_default_lmv_on_dir(env, dt, + &info->lti_buf, + XATTR_NAME_DEFAULT_LMV, 0, + th, BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + } + + /* Transfer default LOV striping from the parent */ + if (lo->ldo_striping_cached && + !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, + lo->ldo_def_stripenr, + lo->ldo_def_stripe_offset)) { + struct lov_user_md_v3 *v3 = info->lti_ea_store; + + if (info->lti_ea_store_size < sizeof(*v3)) { + rc = lod_ea_store_resize(info, sizeof(*v3)); + if (rc != 0) + RETURN(rc); + v3 = info->lti_ea_store; + } + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); + if (lo->ldo_pool != NULL) + strlcpy(v3->lmm_pool_name, lo->ldo_pool, + sizeof(v3->lmm_pool_name)); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + + if (declare) + rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + else + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + XATTR_NAME_LOV, 0, th, + BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + } + + RETURN(0); +} + +static int lod_declare_dir_striping_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + return lod_dir_striping_create_internal(env, dt, attr, dof, th, true); +} + +static int lod_dir_striping_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); +} + static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th, struct lustre_capa *capa) { - struct lod_object *lo = lod_dt_obj(dt); struct dt_object *next = dt_object_child(dt); - __u32 attr; int rc; - int i; ENTRY; - attr = dt->do_lu.lo_header->loh_attr & S_IFMT; - if (S_ISDIR(attr) && strcmp(name, XATTR_NAME_LOV) == 0) { + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_LMV) == 0) { + struct lmv_mds_md_v1 *lmm = buf->lb_buf; + + if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & + LMV_HASH_FLAG_MIGRATION) + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + else + rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + + RETURN(rc); + } + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_LOV) == 0) { + /* default LOVEA */ rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa); - } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) { + RETURN(rc); + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + /* default LMVEA */ + rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, + th, capa); + RETURN(rc); + } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && + !strcmp(name, XATTR_NAME_LOV)) { /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() @@ -1236,32 +2385,12 @@ static int lod_xattr_set(const struct lu_env *env, } else { rc = lod_striping_create(env, dt, NULL, NULL, th); } - } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - if (!S_ISDIR(attr)) - RETURN(-ENOTDIR); - rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, - th, capa); - } else { - /* - * behave transparantly for all other EAs - */ - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); - } - - if (rc != 0 || !S_ISDIR(attr)) - RETURN(rc); - - if (lo->ldo_stripenr == 0) RETURN(rc); - - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th, - capa); - if (rc != 0) - break; } + /* then all other xattr */ + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + RETURN(rc); } @@ -1341,10 +2470,12 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env, rc = 0; v1 = info->lti_ea_store; - if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) + if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { lustre_swab_lov_user_md_v1(v1); - else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { + v3 = (struct lov_user_md_v3 *)v1; lustre_swab_lov_user_md_v3(v3); + } if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1) GOTO(unlock, rc = 0); @@ -1352,6 +2483,11 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env, if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0) GOTO(unlock, rc = 0); + CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n", + PFID(lu_object_fid(&lp->ldo_obj.do_lu)), + (int)v1->lmm_stripe_count, + (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset); + lp->ldo_def_stripenr = v1->lmm_stripe_count; lp->ldo_def_stripe_size = v1->lmm_stripe_size; lp->ldo_def_stripe_offset = v1->lmm_stripe_offset; @@ -1399,7 +2535,7 @@ static int lod_cache_parent_lmv_striping(const struct lu_env *env, rc = 0; v1 = info->lti_ea_store; - lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count) - 1; + lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count); lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); lp->ldo_dir_def_striping_set = 1; @@ -1451,6 +2587,7 @@ static void lod_ah_init(const struct lu_env *env, struct lod_object *lp = NULL; struct lod_object *lc; struct lov_desc *desc; + int rc; ENTRY; LASSERT(child); @@ -1458,6 +2595,9 @@ static void lod_ah_init(const struct lu_env *env, if (likely(parent)) { nextp = dt_object_child(parent); lp = lod_dt_obj(parent); + rc = lod_load_striping(env, lp); + if (rc != 0) + return; } nextc = dt_object_child(child); @@ -1476,8 +2616,6 @@ static void lod_ah_init(const struct lu_env *env, NULL : nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { - int rc; - if (lc->ldo_dir_stripe == NULL) { OBD_ALLOC_PTR(lc->ldo_dir_stripe); if (lc->ldo_dir_stripe == NULL) @@ -1524,10 +2662,9 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_dir_def_hash_type); } - /* If the directory is specified with certain stripes */ + /* It should always honour the specified stripes */ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - int rc; rc = lod_verify_md_striping(d, lum1); if (rc == 0 && @@ -1535,7 +2672,7 @@ static void lod_ah_init(const struct lu_env *env, /* Directory will be striped only if * stripe_count > 1 */ lc->ldo_stripenr = - le32_to_cpu(lum1->lum_stripe_count) - 1; + le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = le32_to_cpu(lum1->lum_stripe_offset); lc->ldo_dir_hash_type = @@ -1544,6 +2681,7 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset); } + /* then check whether there is default stripes from parent */ } else if (lp->ldo_dir_def_striping_set) { /* If there are default dir stripe from parent */ lc->ldo_stripenr = lp->ldo_dir_def_stripenr; @@ -1681,22 +2819,33 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc = -ENOMEM); } - /* choose OST and generate appropriate objects */ - rc = lod_qos_prep_create(env, lo, attr, lovea, th); - if (rc) { - /* failed to create striping, let's reset - * config so that others don't get confused */ - lod_object_free_striping(env, lo); - GOTO(out, rc); - } + if (!dt_object_remote(next)) { + /* choose OST and generate appropriate objects */ + rc = lod_qos_prep_create(env, lo, attr, lovea, th); + if (rc) { + /* failed to create striping, let's reset + * config so that others don't get confused */ + lod_object_free_striping(env, lo); + GOTO(out, rc); + } - /* - * declare storage for striping data - */ - info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr, + /* + * declare storage for striping data + */ + info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr, lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1); - rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, - 0, th); + } else { + /* LOD can not choose OST objects for remote objects, i.e. + * stripes must be ready before that. Right now, it can only + * happen during migrate, i.e. migrate process needs to create + * remote regular file (mdd_migrate_create), then the migrate + * process will provide stripeEA. */ + LASSERT(lovea != NULL); + info->lti_buf = *lovea; + } + + rc = dt_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); if (rc) GOTO(out, rc); @@ -1712,146 +2861,6 @@ out: RETURN(rc); } -int lod_dir_striping_create_internal(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - const struct dt_object_format *dof, - struct thandle *th, - bool declare) -{ - struct lod_thread_info *info = lod_env_info(env); - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - ENTRY; - - if (lo->ldo_dir_def_striping_set && - !LMVEA_DELETE_VALUES(lo->ldo_stripenr, - lo->ldo_dir_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int stripe_count = lo->ldo_stripenr + 1; - - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; - } - - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_stripe_offset); - - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); - - if (declare) - rc = lod_declare_xattr_set_lmv(env, dt, attr, - &info->lti_buf, th); - else - rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, - XATTR_NAME_LMV, 0, th, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - } - - /* Transfer default LMV striping from the parent */ - if (lo->ldo_dir_striping_cached && - !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, - lo->ldo_dir_def_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int def_stripe_count = lo->ldo_dir_def_stripenr + 1; - - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; - } - - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(def_stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_def_stripe_offset); - v1->lum_hash_type = - cpu_to_le32(lo->ldo_dir_def_hash_type); - - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); - if (declare) - rc = dt_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_DEFAULT_LMV, 0, - th); - else - rc = dt_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_DEFAULT_LMV, 0, th, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - } - - /* Transfer default LOV striping from the parent */ - if (lo->ldo_striping_cached && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset)) { - struct lov_user_md_v3 *v3 = info->lti_ea_store; - - if (info->lti_ea_store_size < sizeof(*v3)) { - rc = lod_ea_store_resize(info, sizeof(*v3)); - if (rc != 0) - RETURN(rc); - v3 = info->lti_ea_store; - } - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool) - strncpy(v3->lmm_pool_name, lo->ldo_pool, - LOV_MAXPOOLNAME); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - - if (declare) - rc = dt_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); - else - rc = dt_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th, - BYPASS_CAPA); - if (rc != 0) - RETURN(rc); - } - - RETURN(0); -} - -static int lod_declare_dir_striping_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th) -{ - return lod_dir_striping_create_internal(env, dt, attr, dof, th, true); -} - -static int lod_dir_striping_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th) -{ - return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); -} - static int lod_declare_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -1892,7 +2901,11 @@ static int lod_declare_object_create(const struct lu_env *env, rc = lod_declare_striped_object(env, dt, attr, NULL, th); } else if (dof->dof_type == DFT_DIR) { - rc = lod_declare_dir_striping_create(env, dt, attr, dof, th); + /* Orphan object (like migrating object) does not have + * lod_dir_stripe, see lod_ah_init */ + if (lo->ldo_dir_stripe != NULL) + rc = lod_declare_dir_striping_create(env, dt, attr, + dof, th); } out: RETURN(rc); @@ -1934,13 +2947,12 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, /* create local object */ rc = dt_create(env, next, attr, hint, dof, th); + if (rc != 0) + RETURN(rc); - if (rc == 0) { - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) - rc = lod_dir_striping_create(env, dt, attr, dof, th); - else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0) - rc = lod_striping_create(env, dt, attr, dof, th); - } + if (S_ISREG(dt->do_lu.lo_header->loh_attr) && + lo->ldo_stripe && dof->u.dof_reg.striped != 0) + rc = lod_striping_create(env, dt, attr, dof, th); RETURN(rc); } @@ -1951,35 +2963,57 @@ static int lod_declare_object_destroy(const struct lu_env *env, { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + char *stripe_name = info->lti_key; int rc, i; ENTRY; /* - * we declare destroy for the local object + * load striping information, notice we don't do this when object + * is being initialized as we don't need this information till + * few specific cases like destroy, chown */ - rc = dt_declare_destroy(env, next, th); + rc = lod_load_striping(env, lo); if (rc) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) - RETURN(0); + /* declare destroy for all underlying objects */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); + for (i = 0; i < lo->ldo_stripenr; i++) { + rc = dt_declare_ref_del(env, next, th); + if (rc != 0) + RETURN(rc); + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), + i); + rc = dt_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + RETURN(rc); + } + } /* - * load striping information, notice we don't do this when object - * is being initialized as we don't need this information till - * few specific cases like destroy, chown + * we declare destroy for the local object */ - rc = lod_load_striping(env, lo); + rc = dt_declare_destroy(env, next, th); if (rc) RETURN(rc); - /* declare destroy for all underlying objects */ - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = dt_declare_destroy(env, lo->ldo_stripe[i], th); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) + RETURN(0); - if (rc) - break; + /* declare destroy all striped objects */ + for (i = 0; i < lo->ldo_stripenr; i++) { + if (likely(lo->ldo_stripe[i] != NULL)) { + rc = dt_declare_destroy(env, lo->ldo_stripe[i], th); + if (rc != 0) + break; + } } RETURN(rc); @@ -1990,24 +3024,52 @@ static int lod_object_destroy(const struct lu_env *env, { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + char *stripe_name = info->lti_key; int rc, i; ENTRY; - /* destroy local object */ + /* destroy sub-stripe of master object */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); + + for (i = 0; i < lo->ldo_stripenr; i++) { + rc = dt_ref_del(env, next, th); + if (rc != 0) + RETURN(rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), + i); + + CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n", + PFID(lu_object_fid(&dt->do_lu)), stripe_name, + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); + + rc = dt_delete(env, next, + (const struct dt_key *)stripe_name, + th, BYPASS_CAPA); + if (rc != 0) + RETURN(rc); + } + } rc = dt_destroy(env, next, th); - if (rc) + if (rc != 0) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) RETURN(0); - /* destroy all underlying objects */ + /* destroy all striped objects */ for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - /* for striped directory, next == ldo_stripe[0] */ - if (next != lo->ldo_stripe[i]) { + if (likely(lo->ldo_stripe[i] != NULL) && + (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + i == cfs_fail_val)) { rc = dt_destroy(env, lo->ldo_stripe[i], th); - if (rc) + if (rc != 0) break; } } @@ -2015,23 +3077,6 @@ static int lod_object_destroy(const struct lu_env *env, RETURN(rc); } -static int lod_index_try(const struct lu_env *env, struct dt_object *dt, - const struct dt_index_features *feat) -{ - struct dt_object *next = dt_object_child(dt); - int rc; - ENTRY; - - LASSERT(next->do_ops); - LASSERT(next->do_ops->do_index_try); - - rc = next->do_ops->do_index_try(env, next, feat); - if (next->do_index_ops && dt->do_index_ops == NULL) - dt->do_index_ops = &lod_index_ops; - - RETURN(rc); -} - static int lod_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { @@ -2063,9 +3108,10 @@ static struct obd_capa *lod_capa_get(const struct lu_env *env, return dt_capa_get(env, dt_object_child(dt), old, opc); } -static int lod_object_sync(const struct lu_env *env, struct dt_object *dt) +static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end) { - return dt_object_sync(env, dt_object_child(dt)); + return dt_object_sync(env, dt_object_child(dt), start, end); } struct lod_slave_locks { @@ -2087,7 +3133,7 @@ static int lod_object_unlock_internal(const struct lu_env *env, if (slave_locks == NULL) RETURN(0); - for (i = 0; i < slave_locks->lsl_lock_count; i++) { + for (i = 1; i < slave_locks->lsl_lock_count; i++) { if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) { int rc1; @@ -2115,18 +3161,18 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, if (slave_locks == NULL) RETURN(0); + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(-ENOTDIR); + rc = lod_load_striping(env, lo); if (rc != 0) RETURN(rc); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ - if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt))) + if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt))) RETURN(0); - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); - /* Only cancel slave lock for striped dir */ rc = lod_object_unlock_internal(env, dt, einfo, policy); @@ -2166,7 +3212,7 @@ static int lod_object_lock(const struct lu_env *env, RETURN(rc); /* No stripes */ - if (lo->ldo_stripenr == 0) + if (lo->ldo_stripenr <= 1) RETURN(0); slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr * @@ -2178,15 +3224,20 @@ static int lod_object_lock(const struct lu_env *env, slave_locks->lsl_lock_count = lo->ldo_stripenr; /* striped directory lock */ - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 1; i < lo->ldo_stripenr; i++) { struct lustre_handle lockh; + struct ldlm_res_id *res_id; + + res_id = &lod_env_info(env)->lti_res_id; + fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu), + res_id); + einfo->ei_res_id = res_id; LASSERT(lo->ldo_stripe[i]); rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo, policy); if (rc != 0) GOTO(out, rc); - slave_locks->lsl_handle[i] = lockh; } @@ -2244,11 +3295,11 @@ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, static ssize_t lod_declare_write(const struct lu_env *env, struct dt_object *dt, - const loff_t size, loff_t pos, + const struct lu_buf *buf, loff_t pos, struct thandle *th) { return dt_declare_record_write(env, dt_object_child(dt), - size, pos, th); + buf, pos, th); } static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,