+/**
+ * Implementation of dt_it_ops::key_size.
+ *
+ * Used with striped objects.
+ *
+ * \see dt_it_ops::size() in the API description for details.
+ */
+static int lod_striped_it_key_size(const struct lu_env *env,
+ const struct dt_it *di)
+{
+ struct lod_it *it = (struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+
+ LOD_CHECK_STRIPED_IT(env, it, lo);
+
+ next = lo->ldo_stripe[it->lit_stripe_index];
+ LASSERT(next != NULL);
+ LASSERT(next->do_index_ops != NULL);
+
+ return next->do_index_ops->dio_it.key_size(env, it->lit_it);
+}
+
+/**
+ * Implementation of dt_it_ops::rec.
+ *
+ * Used with striped objects.
+ *
+ * \see dt_it_ops::rec() in the API description for details.
+ */
+static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
+ struct dt_rec *rec, __u32 attr)
+{
+ const struct lod_it *it = (const struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+
+ LOD_CHECK_STRIPED_IT(env, it, lo);
+
+ next = lo->ldo_stripe[it->lit_stripe_index];
+ LASSERT(next != NULL);
+ LASSERT(next->do_index_ops != NULL);
+
+ return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
+}
+
+/**
+ * Implementation of dt_it_ops::rec_size.
+ *
+ * Used with striped objects.
+ *
+ * \see dt_it_ops::rec_size() in the API description for details.
+ */
+static int lod_striped_it_rec_size(const struct lu_env *env,
+ const struct dt_it *di, __u32 attr)
+{
+ struct lod_it *it = (struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+
+ LOD_CHECK_STRIPED_IT(env, it, lo);
+
+ next = lo->ldo_stripe[it->lit_stripe_index];
+ LASSERT(next != NULL);
+ LASSERT(next->do_index_ops != NULL);
+
+ return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
+}
+
+/**
+ * Implementation of dt_it_ops::store.
+ *
+ * Used with striped objects.
+ *
+ * \see dt_it_ops::store() in the API description for details.
+ */
+static __u64 lod_striped_it_store(const struct lu_env *env,
+ const struct dt_it *di)
+{
+ const struct lod_it *it = (const struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+
+ LOD_CHECK_STRIPED_IT(env, it, lo);
+
+ next = lo->ldo_stripe[it->lit_stripe_index];
+ LASSERT(next != NULL);
+ LASSERT(next->do_index_ops != NULL);
+
+ return next->do_index_ops->dio_it.store(env, it->lit_it);
+}
+
+/**
+ * Implementation of dt_it_ops::load.
+ *
+ * Used with striped objects.
+ *
+ * \see dt_it_ops::load() in the API description for details.
+ */
+static int lod_striped_it_load(const struct lu_env *env,
+ const struct dt_it *di, __u64 hash)
+{
+ const struct lod_it *it = (const struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+
+ LOD_CHECK_STRIPED_IT(env, it, lo);
+
+ next = lo->ldo_stripe[it->lit_stripe_index];
+ LASSERT(next != NULL);
+ LASSERT(next->do_index_ops != NULL);
+
+ return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
+}
+
+static struct dt_index_operations lod_striped_index_ops = {
+ .dio_lookup = lod_index_lookup,
+ .dio_declare_insert = lod_declare_index_insert,
+ .dio_insert = lod_index_insert,
+ .dio_declare_delete = lod_declare_index_delete,
+ .dio_delete = lod_index_delete,
+ .dio_it = {
+ .init = lod_striped_it_init,
+ .fini = lod_striped_it_fini,
+ .get = lod_striped_it_get,
+ .put = lod_striped_it_put,
+ .next = lod_striped_it_next,
+ .key = lod_striped_it_key,
+ .key_size = lod_striped_it_key_size,
+ .rec = lod_striped_it_rec,
+ .rec_size = lod_striped_it_rec_size,
+ .store = lod_striped_it_store,
+ .load = lod_striped_it_load,
+ }
+};
+
+/**
+ * Append the FID for each shard of the striped directory after the
+ * given LMV EA header.
+ *
+ * To simplify striped directory and the consistency verification,
+ * we only store the LMV EA header on disk, for both master object
+ * and slave objects. When someone wants to know the whole LMV EA,
+ * such as client readdir(), we can build the entrie LMV EA on the
+ * MDT side (in RAM) via iterating the sub-directory entries that
+ * are contained in the master object of the stripe directory.
+ *
+ * For the master object of the striped directroy, the valid name
+ * for each shard is composed of the ${shard_FID}:${shard_idx}.
+ *
+ * There may be holes in the LMV EA if some shards' name entries
+ * are corrupted or lost.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lo pointer to the master object of the striped directory
+ * \param[in] buf pointer to the lu_buf which will hold the LMV EA
+ * \param[in] resize whether re-allocate the buffer if it is not big enough
+ *
+ * \retval positive size of the LMV EA
+ * \retval 0 for nothing to be loaded
+ * \retval negative error number on failure
+ */
+int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
+ struct lu_buf *buf, bool resize)
+{
+ struct lu_dirent *ent =
+ (struct lu_dirent *)lod_env_info(env)->lti_key;
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ struct dt_object *obj = dt_object_child(&lo->ldo_obj);
+ struct lmv_mds_md_v1 *lmv1 = buf->lb_buf;
+ struct dt_it *it;
+ const struct dt_it_ops *iops;
+ __u32 stripes;
+ __u32 magic = le32_to_cpu(lmv1->lmv_magic);
+ size_t lmv1_size;
+ int rc;
+ ENTRY;
+
+ /* If it is not a striped directory, then load nothing. */
+ if (magic != LMV_MAGIC_V1)
+ RETURN(0);
+
+ /* If it is in migration (or failure), then load nothing. */
+ if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
+ RETURN(0);
+
+ stripes = le32_to_cpu(lmv1->lmv_stripe_count);
+ if (stripes < 1)
+ RETURN(0);
+
+ rc = lmv_mds_md_size(stripes, magic);
+ if (rc < 0)
+ RETURN(rc);
+ lmv1_size = rc;
+ if (buf->lb_len < lmv1_size) {
+ struct lu_buf tbuf;
+
+ if (!resize)
+ RETURN(-ERANGE);
+
+ tbuf = *buf;
+ buf->lb_buf = NULL;
+ buf->lb_len = 0;
+ lu_buf_alloc(buf, lmv1_size);
+ lmv1 = buf->lb_buf;
+ if (lmv1 == NULL)
+ RETURN(-ENOMEM);
+
+ memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
+ }
+
+ if (unlikely(!dt_try_as_dir(env, obj)))
+ RETURN(-ENOTDIR);
+
+ memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
+ iops = &obj->do_index_ops->dio_it;
+ it = iops->init(env, obj, LUDA_64BITHASH);
+ if (IS_ERR(it))
+ RETURN(PTR_ERR(it));
+
+ rc = iops->load(env, it, 0);
+ if (rc == 0)
+ rc = iops->next(env, it);
+ else if (rc > 0)
+ rc = 0;
+
+ while (rc == 0) {
+ char name[FID_LEN + 2] = "";
+ struct lu_fid fid;
+ __u32 index;
+ int len;
+
+ rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
+ if (rc != 0)
+ break;
+
+ rc = -EIO;
+
+ fid_le_to_cpu(&fid, &ent->lde_fid);
+ ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
+ if (ent->lde_name[0] == '.') {
+ if (ent->lde_namelen == 1)
+ goto next;
+
+ if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
+ goto next;
+ }
+
+ len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
+ /* The ent->lde_name is composed of ${FID}:${index} */
+ if (ent->lde_namelen < len + 1 ||
+ memcmp(ent->lde_name, name, len) != 0) {
+ CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
+ "%s: invalid shard name %.*s with the FID "DFID
+ " for the striped directory "DFID", %s\n",
+ lod2obd(lod)->obd_name, ent->lde_namelen,
+ ent->lde_name, PFID(&fid),
+ PFID(lu_object_fid(&obj->do_lu)),
+ lod->lod_lmv_failout ? "failout" : "skip");
+
+ if (lod->lod_lmv_failout)
+ break;
+
+ goto next;
+ }
+
+ index = 0;
+ do {
+ if (ent->lde_name[len] < '0' ||
+ ent->lde_name[len] > '9') {
+ CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
+ "%s: invalid shard name %.*s with the "
+ "FID "DFID" for the striped directory "
+ DFID", %s\n",
+ lod2obd(lod)->obd_name, ent->lde_namelen,
+ ent->lde_name, PFID(&fid),
+ PFID(lu_object_fid(&obj->do_lu)),
+ lod->lod_lmv_failout ?
+ "failout" : "skip");
+
+ if (lod->lod_lmv_failout)
+ break;
+
+ goto next;
+ }
+
+ index = index * 10 + ent->lde_name[len++] - '0';
+ } while (len < ent->lde_namelen);
+
+ if (len == ent->lde_namelen) {
+ /* Out of LMV EA range. */
+ if (index >= stripes) {
+ CERROR("%s: the shard %.*s for the striped "
+ "directory "DFID" is out of the known "
+ "LMV EA range [0 - %u], failout\n",
+ lod2obd(lod)->obd_name, ent->lde_namelen,
+ ent->lde_name,
+ PFID(lu_object_fid(&obj->do_lu)),
+ stripes - 1);
+
+ break;
+ }
+
+ /* The slot has been occupied. */
+ if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
+ struct lu_fid fid0;
+
+ fid_le_to_cpu(&fid0,
+ &lmv1->lmv_stripe_fids[index]);
+ CERROR("%s: both the shard "DFID" and "DFID
+ " for the striped directory "DFID
+ " claim the same LMV EA slot at the "
+ "index %d, failout\n",
+ lod2obd(lod)->obd_name,
+ PFID(&fid0), PFID(&fid),
+ PFID(lu_object_fid(&obj->do_lu)), index);
+
+ break;
+ }
+
+ /* stored as LE mode */
+ lmv1->lmv_stripe_fids[index] = ent->lde_fid;
+
+next:
+ rc = iops->next(env, it);
+ }
+ }
+
+ iops->put(env, it);
+ iops->fini(env, it);
+
+ RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
+}
+
+/**
+ * Implementation of dt_object_operations::do_index_try.
+ *
+ * \see dt_object_operations::do_index_try() in the API description for details.
+ */
+static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_index_features *feat)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ int rc;
+ ENTRY;
+
+ LASSERT(next->do_ops);
+ LASSERT(next->do_ops->do_index_try);
+
+ rc = lod_load_striping_locked(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = next->do_ops->do_index_try(env, next, feat);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (lo->ldo_stripenr > 0) {
+ int i;
+
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ if (dt_object_exists(lo->ldo_stripe[i]) == 0)
+ continue;
+ rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
+ lo->ldo_stripe[i], feat);
+ if (rc != 0)
+ RETURN(rc);
+ }
+ dt->do_index_ops = &lod_striped_index_ops;
+ } else {
+ dt->do_index_ops = &lod_index_ops;
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Implementation of dt_object_operations::do_read_lock.
+ *
+ * \see dt_object_operations::do_read_lock() in the API description for details.
+ */
+static void lod_object_read_lock(const struct lu_env *env,
+ struct dt_object *dt, unsigned role)
+{
+ dt_read_lock(env, dt_object_child(dt), role);
+}
+
+/**
+ * Implementation of dt_object_operations::do_write_lock.
+ *
+ * \see dt_object_operations::do_write_lock() in the API description for
+ * details.
+ */
+static void lod_object_write_lock(const struct lu_env *env,
+ struct dt_object *dt, unsigned role)
+{
+ dt_write_lock(env, dt_object_child(dt), role);
+}
+
+/**
+ * Implementation of dt_object_operations::do_read_unlock.
+ *
+ * \see dt_object_operations::do_read_unlock() in the API description for
+ * details.
+ */
+static void lod_object_read_unlock(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ dt_read_unlock(env, dt_object_child(dt));
+}
+
+/**
+ * Implementation of dt_object_operations::do_write_unlock.
+ *
+ * \see dt_object_operations::do_write_unlock() in the API description for
+ * details.
+ */
+static void lod_object_write_unlock(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ dt_write_unlock(env, dt_object_child(dt));
+}
+
+/**
+ * Implementation of dt_object_operations::do_write_locked.
+ *
+ * \see dt_object_operations::do_write_locked() in the API description for
+ * details.
+ */
+static int lod_object_write_locked(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ return dt_write_locked(env, dt_object_child(dt));
+}
+
+/**
+ * Implementation of dt_object_operations::do_attr_get.
+ *
+ * \see dt_object_operations::do_attr_get() in the API description for details.
+ */
+static int lod_attr_get(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr)
+{
+ /* Note: for striped directory, client will merge attributes
+ * from all of the sub-stripes see lmv_merge_attr(), and there
+ * no MDD logic depend on directory nlink/size/time, so we can
+ * always use master inode nlink and size for now. */
+ return dt_attr_get(env, dt_object_child(dt), attr);
+}
+
+/**
+ * Mark all of the striped directory sub-stripes dead.
+ *
+ * When a striped object is a subject to removal, we have
+ * to mark all the stripes to prevent further access to
+ * them (e.g. create a new file in those). So we mark
+ * all the stripes with LMV_HASH_FLAG_DEAD. The function
+ * can be used to declare the changes and to apply them.
+ * If the object isn't striped, then just return success.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt the striped object
+ * \param[in] handle transaction handle
+ * \param[in] declare whether to declare the change or apply
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ **/
+static int lod_mark_dead_object(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th,
+ bool declare)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lmv_mds_md_v1 *lmv;
+ __u32 dead_hash_type;
+ int rc;
+ int i;
+
+ ENTRY;
+
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ RETURN(0);
+
+ rc = lod_load_striping_locked(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ rc = lod_get_lmv_ea(env, lo);
+ if (rc <= 0)
+ RETURN(rc);
+
+ lmv = lod_env_info(env)->lti_ea_store;
+ lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
+ dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
+ lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ struct lu_buf buf;
+
+ lmv->lmv_master_mdt_index = i;
+ buf.lb_buf = lmv;
+ buf.lb_len = sizeof(*lmv);
+ if (declare) {
+ rc = lod_sub_object_declare_xattr_set(env,
+ lo->ldo_stripe[i], &buf,
+ XATTR_NAME_LMV,
+ LU_XATTR_REPLACE, th);
+ } else {
+ rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+ &buf, XATTR_NAME_LMV,
+ LU_XATTR_REPLACE, th);
+ }
+ if (rc != 0)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Implementation of dt_object_operations::do_declare_attr_set.
+ *
+ * If the object is striped, then apply the changes to all the stripes.
+ *
+ * \see dt_object_operations::do_declare_attr_set() in the API description
+ * for details.
+ */
+static int lod_declare_attr_set(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_attr *attr,
+ struct thandle *th)
+{
+ struct dt_object *next = dt_object_child(dt);
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc, i;
+ ENTRY;
+
+ /* Set dead object on all other stripes */
+ if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
+ attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
+ rc = lod_mark_dead_object(env, dt, th, true);
+ RETURN(rc);
+ }
+
+ /*
+ * declare setattr on the local object
+ */
+ rc = lod_sub_object_declare_attr_set(env, next, attr, th);
+ if (rc)
+ RETURN(rc);
+
+ /* osp_declare_attr_set() ignores all attributes other than
+ * UID, GID, and size, and osp_attr_set() ignores all but UID
+ * and GID. Declaration of size attr setting happens through
+ * lod_declare_init_size(), and not through this function.
+ * Therefore we need not load striping unless ownership is
+ * changing. This should save memory and (we hope) speed up
+ * rename(). */
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
+ if (!(attr->la_valid & (LA_UID | LA_GID)))
+ RETURN(rc);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
+ RETURN(0);
+ } else {
+ if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
+ LA_ATIME | LA_MTIME | LA_CTIME)))
+ RETURN(rc);
+ }
+ /*
+ * load striping information, notice we don't do this when object
+ * is being initialized as we don't need this information till
+ * few specific cases like destroy, chown
+ */
+ rc = lod_load_striping(env, lo);
+ if (rc)
+ RETURN(rc);
+
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ /*
+ * if object is striped declare changes on the stripes
+ */
+ LASSERT(lo->ldo_stripe);
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ if (lo->ldo_stripe[i] == NULL)
+ continue;
+ rc = lod_sub_object_declare_attr_set(env,
+ lo->ldo_stripe[i], attr,
+ th);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
+ dt_object_exists(next) != 0 &&
+ dt_object_remote(next) == 0)
+ lod_sub_object_declare_xattr_del(env, next,
+ XATTR_NAME_LOV, th);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
+ dt_object_exists(next) &&
+ dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_buf *buf = &info->lti_buf;
+
+ buf->lb_buf = info->lti_ea_store;
+ buf->lb_len = info->lti_ea_store_size;
+ lod_sub_object_declare_xattr_set(env, next, buf,
+ XATTR_NAME_LOV,
+ LU_XATTR_REPLACE, th);
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Implementation of dt_object_operations::do_attr_set.
+ *
+ * If the object is striped, then apply the changes to all or subset of
+ * the stripes depending on the object type and specific attributes.
+ *
+ * \see dt_object_operations::do_attr_set() in the API description for details.
+ */
+static int lod_attr_set(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_attr *attr,
+ struct thandle *th)
+{
+ struct dt_object *next = dt_object_child(dt);
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc, i;
+ ENTRY;
+
+ /* Set dead object on all other stripes */
+ if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
+ attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
+ rc = lod_mark_dead_object(env, dt, th, false);
+ RETURN(rc);
+ }
+
+ /*
+ * apply changes to the local object
+ */
+ rc = lod_sub_object_attr_set(env, next, attr, th);
+ if (rc)
+ RETURN(rc);