X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=91b901b65199bb9fbf534a8b7db8d6365b81729f;hp=04cbd1a00009abf5f8de1fcde5cf6c4cd24978e9;hb=083d62ee6de5ac6cee95c1d2f86b62b75034093b;hpb=098fb363c3902f67b29ddfa864b452d0a8460ad2 diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 04cbd1a..91b901b 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2012, 2015, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -40,6 +40,8 @@ #define DEBUG_SUBSYSTEM S_MDS +#include + #include #include #include @@ -47,9 +49,9 @@ #include #include #include -#include +#include #include -#include +#include #include #include @@ -58,9 +60,6 @@ static const char dot[] = "."; static const char dotdot[] = ".."; -static const struct dt_body_operations lod_body_lnk_ops; -static const struct dt_body_operations lod_body_ops; - /** * Implementation of dt_index_operations::dio_lookup * @@ -68,8 +67,8 @@ static const struct dt_body_operations lod_body_ops; * * \see dt_index_operations::dio_lookup() in the API description for details. */ -static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key) +static int lod_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key) { struct dt_object *next = dt_object_child(dt); return next->do_index_ops->dio_lookup(env, next, rec, key); @@ -83,14 +82,11 @@ static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt, * \see dt_index_operations::dio_declare_insert() in the API description * for details. */ -static int lod_declare_index_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th) +static int lod_declare_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th) { - return lod_sub_object_declare_insert(env, dt_object_child(dt), - rec, key, th); + return lod_sub_declare_insert(env, dt_object_child(dt), rec, key, th); } /** @@ -100,15 +96,11 @@ static int lod_declare_index_insert(const struct lu_env *env, * * \see dt_index_operations::dio_insert() in the API description for details. */ -static int lod_index_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th, - int ign) +static int lod_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, const struct dt_key *key, + struct thandle *th, int ign) { - return lod_sub_object_index_insert(env, dt_object_child(dt), rec, key, - th, ign); + return lod_sub_insert(env, dt_object_child(dt), rec, key, th, ign); } /** @@ -119,13 +111,10 @@ static int lod_index_insert(const struct lu_env *env, * \see dt_index_operations::dio_declare_delete() in the API description * for details. */ -static int lod_declare_index_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th) +static int lod_declare_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *key, struct thandle *th) { - return lod_sub_object_declare_delete(env, dt_object_child(dt), key, - th); + return lod_sub_declare_delete(env, dt_object_child(dt), key, th); } /** @@ -135,12 +124,10 @@ static int lod_declare_index_delete(const struct lu_env *env, * * \see dt_index_operations::dio_delete() in the API description for details. */ -static int lod_index_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th) +static int lod_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *key, struct thandle *th) { - return lod_sub_object_delete(env, dt_object_child(dt), key, th); + return lod_sub_delete(env, dt_object_child(dt), key, th); } /** @@ -358,11 +345,11 @@ static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, } static struct dt_index_operations lod_index_ops = { - .dio_lookup = lod_index_lookup, - .dio_declare_insert = lod_declare_index_insert, - .dio_insert = lod_index_insert, - .dio_declare_delete = lod_declare_index_delete, - .dio_delete = lod_index_delete, + .dio_lookup = lod_lookup, + .dio_declare_insert = lod_declare_insert, + .dio_insert = lod_insert, + .dio_declare_delete = lod_declare_delete, + .dio_delete = lod_delete, .dio_it = { .init = lod_it_init, .fini = lod_it_fini, @@ -396,7 +383,7 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, struct dt_it *it_next; ENTRY; - LASSERT(lo->ldo_stripenr > 0); + LASSERT(lo->ldo_dir_stripe_count > 0); next = lo->ldo_stripe[0]; LASSERT(next != NULL); LASSERT(next->do_index_ops != NULL); @@ -419,12 +406,12 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, return (struct dt_it *)it; } -#define LOD_CHECK_STRIPED_IT(env, it, lo) \ -do { \ - LASSERT((it)->lit_obj != NULL); \ - LASSERT((it)->lit_it != NULL); \ - LASSERT((lo)->ldo_stripenr > 0); \ - LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \ +#define LOD_CHECK_STRIPED_IT(env, it, lo) \ +do { \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ + LASSERT((lo)->ldo_dir_stripe_count > 0); \ + LASSERT((it)->lit_stripe_index < (lo)->ldo_dir_stripe_count); \ } while (0) /** @@ -560,7 +547,7 @@ again: } /* go to next stripe */ - if (it->lit_stripe_index + 1 >= lo->ldo_stripenr) + if (it->lit_stripe_index + 1 >= lo->ldo_dir_stripe_count) RETURN(1); it->lit_stripe_index++; @@ -727,11 +714,11 @@ static int lod_striped_it_load(const struct lu_env *env, } static struct dt_index_operations lod_striped_index_ops = { - .dio_lookup = lod_index_lookup, - .dio_declare_insert = lod_declare_index_insert, - .dio_insert = lod_index_insert, - .dio_declare_delete = lod_declare_index_delete, - .dio_delete = lod_index_delete, + .dio_lookup = lod_lookup, + .dio_declare_insert = lod_declare_insert, + .dio_insert = lod_insert, + .dio_declare_delete = lod_declare_delete, + .dio_delete = lod_delete, .dio_it = { .init = lod_striped_it_init, .fini = lod_striped_it_fini, @@ -962,7 +949,7 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(next->do_ops); LASSERT(next->do_ops->do_index_try); - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -970,10 +957,10 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr > 0) { + if (lo->ldo_dir_stripe_count > 0) { int i; - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { if (dt_object_exists(lo->ldo_stripe[i]) == 0) continue; rc = lo->ldo_stripe[i]->do_ops->do_index_try(env, @@ -994,8 +981,8 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, * * \see dt_object_operations::do_read_lock() in the API description for details. */ -static void lod_object_read_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +static void lod_read_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { dt_read_lock(env, dt_object_child(dt), role); } @@ -1006,8 +993,8 @@ static void lod_object_read_lock(const struct lu_env *env, * \see dt_object_operations::do_write_lock() in the API description for * details. */ -static void lod_object_write_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +static void lod_write_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { dt_write_lock(env, dt_object_child(dt), role); } @@ -1018,8 +1005,7 @@ static void lod_object_write_lock(const struct lu_env *env, * \see dt_object_operations::do_read_unlock() in the API description for * details. */ -static void lod_object_read_unlock(const struct lu_env *env, - struct dt_object *dt) +static void lod_read_unlock(const struct lu_env *env, struct dt_object *dt) { dt_read_unlock(env, dt_object_child(dt)); } @@ -1030,8 +1016,7 @@ static void lod_object_read_unlock(const struct lu_env *env, * \see dt_object_operations::do_write_unlock() in the API description for * details. */ -static void lod_object_write_unlock(const struct lu_env *env, - struct dt_object *dt) +static void lod_write_unlock(const struct lu_env *env, struct dt_object *dt) { dt_write_unlock(env, dt_object_child(dt)); } @@ -1042,8 +1027,7 @@ static void lod_object_write_unlock(const struct lu_env *env, * \see dt_object_operations::do_write_locked() in the API description for * details. */ -static int lod_object_write_locked(const struct lu_env *env, - struct dt_object *dt) +static int lod_write_locked(const struct lu_env *env, struct dt_object *dt) { return dt_write_locked(env, dt_object_child(dt)); } @@ -1064,6 +1048,139 @@ static int lod_attr_get(const struct lu_env *env, return dt_attr_get(env, dt_object_child(dt), attr); } +static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, + struct lov_desc *desc) +{ + if (comp->llc_pattern != LOV_PATTERN_MDT) { + if (!comp->llc_stripe_count) + comp->llc_stripe_count = + desc->ld_default_stripe_count; + } + if (comp->llc_stripe_size <= 0) + comp->llc_stripe_size = desc->ld_default_stripe_size; +} + +int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, + struct thandle *th, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_layout_component *lod_comp; + int i, j, rc; + ENTRY; + + LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_stripe == NULL) + continue; + + /* has stripe but not inited yet, this component has been + * declared to be created, but hasn't created yet. + */ + if (!lod_comp_inited(lod_comp)) + continue; + + if (data->locd_comp_skip_cb && + data->locd_comp_skip_cb(env, lo, i, data)) + continue; + + if (data->locd_comp_cb) { + rc = data->locd_comp_cb(env, lo, i, data); + if (rc) + RETURN(rc); + } + + /* could used just to do sth about component, not each + * stripes + */ + if (!data->locd_stripe_cb) + continue; + + LASSERT(lod_comp->llc_stripe_count > 0); + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + struct dt_object *dt = lod_comp->llc_stripe[j]; + + if (dt == NULL) + continue; + rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data); + if (rc != 0) + RETURN(rc); + } + } + RETURN(0); +} + +static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env, + struct lod_object *lo, int comp_idx, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx]; + bool skipped = false; + + if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION)) + return skipped; + + switch (lo->ldo_flr_state) { + case LCM_FL_WRITE_PENDING: { + int i; + + /* skip stale components */ + if (lod_comp->llc_flags & LCME_FL_STALE) { + skipped = true; + break; + } + + /* skip valid and overlapping components, therefore any + * attempts to write overlapped components will never succeed + * because client will get EINPROGRESS. */ + for (i = 0; i < lo->ldo_comp_cnt; i++) { + if (i == comp_idx) + continue; + + if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE) + continue; + + if (lu_extent_is_overlapped(&lod_comp->llc_extent, + &lo->ldo_comp_entries[i].llc_extent)) { + skipped = true; + break; + } + } + break; + } + default: + LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state); + case LCM_FL_SYNC_PENDING: + break; + } + + CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), + skipped ? "skipped" : "chose", lod_comp->llc_id, + data->locd_attr->la_layout_version); + + return skipped; +} + +static inline int +lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) +{ + if (data->locd_declare) + return lod_sub_declare_attr_set(env, dt, data->locd_attr, th); + + if (data->locd_attr->la_valid & LA_LAYOUT_VERSION) { + CDEBUG(D_LAYOUT, DFID": set layout version: %u, comp_idx: %d\n", + PFID(lu_object_fid(&dt->do_lu)), + data->locd_attr->la_layout_version, comp_idx); + } + + return lod_sub_attr_set(env, dt, data->locd_attr, th); +} + /** * Implementation of dt_object_operations::do_declare_attr_set. * @@ -1085,25 +1202,26 @@ static int lod_declare_attr_set(const struct lu_env *env, /* * declare setattr on the local object */ - rc = lod_sub_object_declare_attr_set(env, next, attr, th); + rc = lod_sub_declare_attr_set(env, next, attr, th); if (rc) RETURN(rc); /* osp_declare_attr_set() ignores all attributes other than - * UID, GID, and size, and osp_attr_set() ignores all but UID - * and GID. Declaration of size attr setting happens through - * lod_declare_init_size(), and not through this function. - * Therefore we need not load striping unless ownership is - * changing. This should save memory and (we hope) speed up - * rename(). */ + * UID, GID, PROJID, and size, and osp_attr_set() ignores all + * but UID, GID and PROJID. Declaration of size attr setting + * happens through lod_declare_init_size(), and not through + * this function. Therefore we need not load striping unless + * ownership is changing. This should save memory and (we hope) + * speed up rename(). + */ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - if (!(attr->la_valid & (LA_UID | LA_GID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { - if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | + if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID | LA_MODE | LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS))) RETURN(rc); @@ -1113,44 +1231,56 @@ static int lod_declare_attr_set(const struct lu_env *env, * is being initialized as we don't need this information till * few specific cases like destroy, chown */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (!lod_obj_is_striped(dt)) RETURN(0); /* * if object is striped declare changes on the stripes */ - LASSERT(lo->ldo_stripe); - for (i = 0; i < lo->ldo_stripenr; i++) { - if (lo->ldo_stripe[i] == NULL) - continue; - rc = lod_sub_object_declare_attr_set(env, - lo->ldo_stripe[i], attr, - th); - if (rc != 0) - RETURN(rc); + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(lo->ldo_stripe); + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (lo->ldo_stripe[i] == NULL) + continue; + rc = lod_sub_declare_attr_set(env, lo->ldo_stripe[i], + attr, th); + if (rc != 0) + RETURN(rc); + } + } else { + struct lod_obj_stripe_cb_data data = { { 0 } }; + + data.locd_attr = attr; + data.locd_declare = true; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && - dt_object_exists(next) != 0 && - dt_object_remote(next) == 0) - lod_sub_object_declare_xattr_del(env, next, - XATTR_NAME_LOV, th); + if (rc) + RETURN(rc); + + if (!dt_object_exists(next) || dt_object_remote(next) || + !S_ISREG(attr->la_mode)) + RETURN(0); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && - dt_object_exists(next) && - dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + rc = lod_sub_declare_xattr_del(env, next, XATTR_NAME_LOV, th); + RETURN(rc); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) || + OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; - lod_sub_object_declare_xattr_set(env, next, buf, - XATTR_NAME_LOV, - LU_XATTR_REPLACE, th); + rc = lod_sub_declare_xattr_set(env, next, buf, XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1177,51 +1307,73 @@ static int lod_attr_set(const struct lu_env *env, /* * apply changes to the local object */ - rc = lod_sub_object_attr_set(env, next, attr, th); + rc = lod_sub_attr_set(env, next, attr, th); if (rc) RETURN(rc); if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - if (!(attr->la_valid & (LA_UID | LA_GID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { - if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | + if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | LA_PROJID | LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS))) RETURN(rc); } - if (lo->ldo_stripenr == 0) + /* FIXME: a tricky case in the code path of mdd_layout_change(): + * the in-memory striping information has been freed in lod_xattr_set() + * due to layout change. It has to load stripe here again. It only + * changes flags of layout so declare_attr_set() is still accurate */ + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + if (!lod_obj_is_striped(dt)) RETURN(0); /* * if object is striped, apply changes to all the stripes */ - LASSERT(lo->ldo_stripe); - for (i = 0; i < lo->ldo_stripenr; i++) { - if (unlikely(lo->ldo_stripe[i] == NULL)) - continue; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(lo->ldo_stripe); + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (unlikely(lo->ldo_stripe[i] == NULL)) + continue; - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - (dt_object_exists(lo->ldo_stripe[i]) == 0)) - continue; + if ((dt_object_exists(lo->ldo_stripe[i]) == 0)) + continue; - rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th); - if (rc != 0) - break; + rc = lod_sub_attr_set(env, lo->ldo_stripe[i], attr, th); + if (rc != 0) + break; + } + } else { + struct lod_obj_stripe_cb_data data = { { 0 } }; + + data.locd_attr = attr; + data.locd_declare = false; + data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && - dt_object_exists(next) != 0 && - dt_object_remote(next) == 0) - rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th); + if (rc) + RETURN(rc); + + if (!dt_object_exists(next) || dt_object_remote(next) || + !S_ISREG(attr->la_mode)) + RETURN(0); + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); + RETURN(rc); + } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && - dt_object_exists(next) && - dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct ost_id *oi = &info->lti_ostid; @@ -1229,16 +1381,24 @@ static int lod_attr_set(const struct lu_env *env, struct lov_mds_md_v1 *lmm; struct lov_ost_data_v1 *objs; __u32 magic; - int rc1; - rc1 = lod_get_lov_ea(env, lo); - if (rc1 <= 0) + rc = lod_get_lov_ea(env, lo); + if (rc <= 0) RETURN(rc); buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; lmm = info->lti_ea_store; magic = le32_to_cpu(lmm->lmm_magic); + if (magic == LOV_MAGIC_COMP_V1) { + struct lov_comp_md_v1 *lcm = buf->lb_buf; + struct lov_comp_md_entry_v1 *lcme = + &lcm->lcm_entries[0]; + + lmm = buf->lb_buf + le32_to_cpu(lcme->lcme_offset); + magic = le32_to_cpu(lmm->lmm_magic); + } + if (magic == LOV_MAGIC_V1) objs = &(lmm->lmm_objects[0]); else @@ -1249,8 +1409,31 @@ static int lod_attr_set(const struct lu_env *env, fid_to_ostid(fid, oi); ostid_cpu_to_le(oi, &objs->l_ost_oi); - rc = lod_sub_object_xattr_set(env, next, buf, XATTR_NAME_LOV, - LU_XATTR_REPLACE, th); + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); + } else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf *buf = &info->lti_buf; + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_entry_v1 *lcme; + + rc = lod_get_lov_ea(env, lo); + if (rc <= 0) + RETURN(rc); + + buf->lb_buf = info->lti_ea_store; + buf->lb_len = info->lti_ea_store_size; + lcm = buf->lb_buf; + if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1) + RETURN(-EINVAL); + + le32_add_cpu(&lcm->lcm_layout_gen, 1); + lcme = &lcm->lcm_entries[0]; + le64_add_cpu(&lcme->lcme_extent.e_start, 1); + le64_add_cpu(&lcme->lcme_extent.e_end, -1); + + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1267,9 +1450,10 @@ static int lod_attr_set(const struct lu_env *env, static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, const char *name) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev); - int rc, is_root; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev); + int is_root; + int rc; ENTRY; rc = dt_xattr_get(env, dt_object_child(dt), buf, name); @@ -1313,6 +1497,8 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, RETURN(rc); /* + * XXX: Only used by lfsck + * * lod returns default striping on the real root of the device * this is like the root stores default striping for the whole * filesystem. historically we've been using a different approach @@ -1417,8 +1603,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(lo->ldo_dir_striped != 0); - LASSERT(lo->ldo_stripenr > 0); - stripe_count = lo->ldo_stripenr; + LASSERT(lo->ldo_dir_stripe_count > 0); + stripe_count = lo->ldo_dir_stripe_count; /* Only store the LMV EA heahder on the disk. */ if (info->lti_ea_store_size < sizeof(*lmm1)) { rc = lod_ea_store_resize(info, sizeof(*lmm1)); @@ -1473,6 +1659,8 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, int rc = 0; ENTRY; + LASSERT(mutex_is_locked(&lo->ldo_layout_mutex)); + if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) RETURN(0); @@ -1528,10 +1716,10 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, } out: lo->ldo_stripe = stripe; - lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count); - lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); if (rc != 0) - lod_object_free_striping(env, lo); + lod_striping_free_nolock(env, lo); RETURN(rc); } @@ -1589,38 +1777,36 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, GOTO(out, rc = -EINVAL); rec->rec_type = S_IFDIR; - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { struct dt_object *dto = lo->ldo_stripe[i]; char *stripe_name = info->lti_key; struct lu_name *sname; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; - rc = lod_sub_object_declare_create(env, dto, attr, NULL, - dof, th); + rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th); if (rc != 0) GOTO(out, rc); if (!dt_try_as_dir(env, dto)) GOTO(out, rc = -EINVAL); - rc = lod_sub_object_declare_ref_add(env, dto, th); + rc = lod_sub_declare_ref_add(env, dto, th); if (rc != 0) GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_object_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); if (rc != 0) GOTO(out, rc); /* master stripe FID will be put to .. */ rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_object_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, - th); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(out, rc); @@ -1633,8 +1819,8 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, else slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); - rc = lod_sub_object_declare_xattr_set(env, dto, - &slave_lmv_buf, XATTR_NAME_LMV, 0, th); + rc = lod_sub_declare_xattr_set(env, dto, &slave_lmv_buf, + XATTR_NAME_LMV, 0, th); if (rc != 0) GOTO(out, rc); } @@ -1648,37 +1834,33 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, PFID(lu_object_fid(&dto->do_lu)), i); sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf); - if (rc != 0) - GOTO(out, rc); - - rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); if (rc != 0) GOTO(out, rc); linkea_buf.lb_buf = ldata.ld_buf->lb_buf; linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_object_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); if (rc != 0) GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_object_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, - th); + rc = lod_sub_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); if (rc != 0) GOTO(out, rc); - rc = lod_sub_object_declare_ref_add(env, dt_object_child(dt), - th); + rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th); if (rc != 0) GOTO(out, rc); } - rc = lod_sub_object_declare_xattr_set(env, dt_object_child(dt), - &lmv_buf, XATTR_NAME_LMV, 0, th); + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, 0, th); if (rc != 0) GOTO(out, rc); out: @@ -1705,25 +1887,33 @@ static int lod_prep_md_striped_create(const struct lu_env *env, int rc = 0; __u32 i; __u32 j; + bool is_specific = false; ENTRY; /* The lum has been verifed in lod_verify_md_striping */ - LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0); stripe_count = le32_to_cpu(lum->lum_stripe_count); - OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); - if (stripe == NULL) - RETURN(-ENOMEM); - OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); if (idx_array == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); + if (stripe == NULL) GOTO(out_free, rc = -ENOMEM); - /* Start index will be the master MDT */ + /* Start index must be the master MDT */ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; idx_array[0] = master_index; + if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + is_specific = true; + for (i = 1; i < stripe_count; i++) + idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds); + } + for (i = 0; i < stripe_count; i++) { struct lod_tgt_desc *tgt = NULL; struct dt_object *dto; @@ -1741,21 +1931,9 @@ static int lod_prep_md_striped_create(const struct lu_env *env, CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", idx, lod->lod_remote_mdt_count + 1, i); - if (idx == master_index) { - /* Allocate the FID locally */ - rc = obd_fid_alloc(env, lod->lod_child_exp, - &fid, NULL); - if (rc < 0) - GOTO(out_put, rc); - tgt_dt = lod->lod_child; - break; - } - /* Find next available target */ - if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) - continue; - - if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + if (likely(!is_specific && + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { /* check whether the idx already exists * in current allocated array */ for (k = 0; k < i; k++) { @@ -1769,6 +1947,22 @@ static int lod_prep_md_striped_create(const struct lu_env *env, continue; } + /* Sigh, this index is not in the bitmap, let's check + * next available target */ + if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx) && + idx != master_index) + continue; + + if (idx == master_index) { + /* Allocate the FID locally */ + rc = obd_fid_alloc(env, lod->lod_child_exp, + &fid, NULL); + if (rc < 0) + GOTO(out_put, rc); + tgt_dt = lod->lod_child; + break; + } + /* check the status of the OSP */ tgt = LTD_TGT(ltd, idx); if (tgt == NULL) @@ -1794,7 +1988,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, /* Can not allocate more stripes */ if (j == lod->lod_remote_mdt_count) { CDEBUG(D_INFO, "%s: require stripes %u only get %d\n", - lod2obd(lod)->obd_name, stripe_count, i - 1); + lod2obd(lod)->obd_name, stripe_count, i); break; } @@ -1802,9 +1996,17 @@ static int lod_prep_md_striped_create(const struct lu_env *env, idx, i, PFID(&fid)); idx_array[i] = idx; /* Set the start index for next stripe allocation */ - if (i < stripe_count - 1) + if (!is_specific && i < stripe_count - 1) { + /* + * for large dir test, put all other slaves on one + * remote MDT, otherwise we may save too many local + * slave locks which will exceed RS_MAX_LOCKS. + */ + if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) + idx = master_index; idx_array[i + 1] = (idx + 1) % (lod->lod_remote_mdt_count + 1); + } /* tgt_dt and fid must be ready after search avaible OSP * in the above loop */ LASSERT(tgt_dt != NULL); @@ -1820,10 +2022,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, lo->ldo_dir_striped = 1; lo->ldo_stripe = stripe; - lo->ldo_stripenr = i; - lo->ldo_stripes_allocated = stripe_count; + lo->ldo_dir_stripe_count = i; + lo->ldo_dir_stripes_allocated = stripe_count; + smp_mb(); + lo->ldo_dir_stripe_loaded = 1; - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripe_count == 0) GOTO(out_put, rc = -ENOSPC); rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); @@ -1834,16 +2038,15 @@ out_put: if (rc < 0) { for (i = 0; i < stripe_count; i++) if (stripe[i] != NULL) - lu_object_put(env, &stripe[i]->do_lu); + dt_object_put(env, stripe[i]); OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); - lo->ldo_stripenr = 0; - lo->ldo_stripes_allocated = 0; + lo->ldo_dir_stripe_count = 0; + lo->ldo_dir_stripes_allocated = 0; lo->ldo_stripe = NULL; } out_free: - if (idx_array != NULL) - OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); + OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); RETURN(rc); } @@ -1875,31 +2078,25 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct thandle *th) { struct lod_object *lo = lod_dt_obj(dt); - struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); - struct lmv_user_md_v1 *lum; + struct lmv_user_md_v1 *lum = lum_buf->lb_buf; int rc; ENTRY; - lum = lum_buf->lb_buf; LASSERT(lum != NULL); CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n", le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), (int)le32_to_cpu(lum->lum_stripe_offset)); - if (le32_to_cpu(lum->lum_stripe_count) == 0) + if (lo->ldo_dir_stripe_count == 0) GOTO(out, rc = 0); - rc = lod_verify_md_striping(lod, lum); - if (rc != 0) - GOTO(out, rc); - /* prepare dir striped objects */ rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); if (rc != 0) { /* failed to create striping, let's reset * config so that others don't get confused */ - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); GOTO(out, rc); } out: @@ -1937,9 +2134,13 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, rc = lod_verify_md_striping(d, lum); if (rc != 0) RETURN(rc); + } else if (strcmp(name, XATTR_NAME_LOV) == 0) { + rc = lod_verify_striping(d, lo, buf, false); + if (rc != 0) + RETURN(rc); } - rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); if (rc != 0) RETURN(rc); @@ -1951,18 +2152,18 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(0); /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripe_count == 0) RETURN(0); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { LASSERT(lo->ldo_stripe[i]); - rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i], - buf, name, fl, th); + rc = lod_sub_declare_xattr_set(env, lo->ldo_stripe[i], + buf, name, fl, th); if (rc != 0) break; } @@ -1970,6 +2171,54 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(rc); } +static int +lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, + struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx]; + struct filter_fid *ff = &info->lti_ff; + struct lu_buf *buf = &info->lti_buf; + int rc; + + buf->lb_buf = ff; + buf->lb_len = sizeof(*ff); + rc = dt_xattr_get(env, dt, buf, XATTR_NAME_FID); + if (rc < 0) { + if (rc == -ENODATA) + return 0; + return rc; + } + + filter_fid_le_to_cpu(ff, ff, sizeof(*ff)); + if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) && + ff->ff_layout.ol_comp_id == comp->llc_id) + return 0; + + /* rewrite filter_fid */ + memset(ff, 0, sizeof(*ff)); + ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); + ff->ff_parent.f_ver = stripe_idx; + ff->ff_layout.ol_stripe_size = comp->llc_stripe_size; + ff->ff_layout.ol_stripe_count = comp->llc_stripe_count; + ff->ff_layout.ol_comp_id = comp->llc_id; + ff->ff_layout.ol_comp_start = comp->llc_extent.e_start; + ff->ff_layout.ol_comp_end = comp->llc_extent.e_end; + filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); + + if (data->locd_declare) + rc = lod_sub_declare_xattr_set(env, dt, buf, XATTR_NAME_FID, + LU_XATTR_REPLACE, th); + else + rc = lod_sub_xattr_set(env, dt, buf, XATTR_NAME_FID, + LU_XATTR_REPLACE, th); + + return rc; +} + /** * Reset parent FID on OST object * @@ -1983,27 +2232,28 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, * \param[in] declare if it is declare * * \retval 0 if reset succeeds - * \retval negative errno if reset fais + * \retval negative errno if reset fails */ -static int lod_object_replace_parent_fid(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th, bool declare) +static int lod_replace_parent_fid(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th, bool declare) { struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct filter_fid *ff; - int i, rc; + struct lod_obj_stripe_cb_data data = { { 0 } }; + int rc; ENTRY; LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (!lod_obj_is_striped(dt)) RETURN(0); if (info->lti_ea_store_size < sizeof(*ff)) { @@ -2015,1977 +2265,3949 @@ static int lod_object_replace_parent_fid(const struct lu_env *env, buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; - for (i = 0; i < lo->ldo_stripenr; i++) { - if (lo->ldo_stripe[i] == NULL) - continue; - - rc = dt_xattr_get(env, lo->ldo_stripe[i], buf, - XATTR_NAME_FID); - if (rc < 0) { - rc = 0; - continue; - } - - ff = buf->lb_buf; - fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent); - ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq; - ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid; - fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent); - - if (declare) { - rc = lod_sub_object_declare_xattr_set(env, - lo->ldo_stripe[i], buf, - XATTR_NAME_FID, - LU_XATTR_REPLACE, th); - } else { - rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], - buf, XATTR_NAME_FID, - LU_XATTR_REPLACE, th); - } - if (rc < 0) - break; - } + data.locd_declare = declare; + data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); RETURN(rc); } -/** - * Implementation of dt_object_operations::do_declare_xattr_set. - * - * \see dt_object_operations::do_declare_xattr_set() in the API description - * for details. - * - * the extension to the API: - * - declaring LOVEA requests striping creation - * - LU_XATTR_REPLACE means layout swap - */ -static int lod_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +inline __u16 lod_comp_entry_stripe_count(struct lod_object *lo, + struct lod_layout_component *entry, + bool is_dir) { - struct dt_object *next = dt_object_child(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; - __u32 mode; - int rc; - ENTRY; + struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); + + if (is_dir) + return 0; + else if (lod_comp_inited(entry)) + return entry->llc_stripe_count; + else if ((__u16)-1 == entry->llc_stripe_count) + return lod->lod_desc.ld_tgt_count; + else + return lod_get_stripe_count(lod, lo, entry->llc_stripe_count); +} - /* - * allow to declare predefined striping on a new (!mode) object - * which is supposed to be replay of regular file creation - * (when LOV setting is declared) - * LU_XATTR_REPLACE is set to indicate a layout swap - */ - mode = dt->do_lu.lo_header->loh_attr & S_IFMT; - if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 && - !(fl & LU_XATTR_REPLACE)) { - /* - * this is a request to manipulate object's striping - */ - if (dt_object_exists(dt)) { - rc = dt_attr_get(env, next, attr); - if (rc) - RETURN(rc); - } else { - memset(attr, 0, sizeof(*attr)); - attr->la_valid = LA_TYPE | LA_MODE; - attr->la_mode = S_IFREG; - } - rc = lod_declare_striped_object(env, dt, attr, buf, th); - } else if (S_ISDIR(mode)) { - rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); - } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_object_replace_parent_fid(env, dt, th, true); +static int lod_comp_md_size(struct lod_object *lo, bool is_dir) +{ + int magic, size = 0, i; + struct lod_layout_component *comp_entries; + __u16 comp_cnt; + bool is_composite; + + if (is_dir) { + comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt; + comp_entries = lo->ldo_def_striping->lds_def_comp_entries; + is_composite = + lo->ldo_def_striping->lds_def_striping_is_composite; } else { - rc = lod_sub_object_declare_xattr_set(env, next, buf, name, - fl, th); + comp_cnt = lo->ldo_comp_cnt; + comp_entries = lo->ldo_comp_entries; + is_composite = lo->ldo_is_composite; } - RETURN(rc); + + LASSERT(comp_cnt != 0 && comp_entries != NULL); + if (is_composite) { + size = sizeof(struct lov_comp_md_v1) + + sizeof(struct lov_comp_md_entry_v1) * comp_cnt; + LASSERT(size % sizeof(__u64) == 0); + } + + for (i = 0; i < comp_cnt; i++) { + __u16 stripe_count; + + magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1; + stripe_count = lod_comp_entry_stripe_count(lo, &comp_entries[i], + is_dir); + if (!is_dir && is_composite) + lod_comp_shrink_stripe_count(&comp_entries[i], + &stripe_count); + + size += lov_user_md_size(stripe_count, magic); + LASSERT(size % sizeof(__u64) == 0); + } + return size; } /** - * Resets cached default striping in the object. + * Declare component add. The xattr name is XATTR_LUSTRE_LOV.add, and + * the xattr value is binary lov_comp_md_v1 which contains component(s) + * to be added. + * + * \param[in] env execution environment + * \param[in] dt dt_object to add components on + * \param[in] buf buffer contains components to be added + * \parem[in] th thandle * - * \param[in] lo object + * \retval 0 on success + * \retval negative errno on failure */ -static void lod_lov_stripe_cache_clear(struct lod_object *lo) +static int lod_declare_layout_add(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) { - lo->ldo_def_striping_set = 0; - lo->ldo_def_striping_cached = 0; - lod_object_set_pool(lo, NULL); - lo->ldo_def_stripe_size = 0; - lo->ldo_def_stripenr = 0; - if (lo->ldo_dir_stripe != NULL) - lo->ldo_dir_def_striping_cached = 0; -} + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *comp_array, *lod_comp, *old_array; + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct dt_object *next = dt_object_child(dt); + struct lov_desc *desc = &d->lod_desc; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_user_md_v3 *v3; + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + __u32 magic; + int i, rc, array_cnt, old_array_cnt; + ENTRY; -/** - * Apply xattr changes to the object. - * - * Applies xattr changes to the object and the stripes if the latter exist. - * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] buf buffer pointing to the new value of xattr - * \param[in] name name of xattr - * \param[in] fl flags - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed - */ -static int lod_xattr_set_internal(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) -{ - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; - ENTRY; + LASSERT(lo->ldo_is_composite); - rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th); - if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + if (lo->ldo_flr_state != LCM_FL_NONE) + RETURN(-EBUSY); + + rc = lod_verify_striping(d, lo, buf, false); + if (rc != 0) RETURN(rc); - /* Note: Do not set LinkEA on sub-stripes, otherwise - * it will confuse the fid2path process(see mdt_path_current()). - * The linkEA between master and sub-stripes is set in - * lod_xattr_set_lmv(). */ - if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0) - RETURN(0); + magic = comp_v1->lcm_magic; + if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + lustre_swab_lov_comp_md_v1(comp_v1); + magic = comp_v1->lcm_magic; + } - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); + if (magic != LOV_USER_MAGIC_COMP_V1) + RETURN(-EINVAL); - rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name, - fl, th); - if (rc != 0) - break; + array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count; + OBD_ALLOC(comp_array, sizeof(*comp_array) * array_cnt); + if (comp_array == NULL) + RETURN(-ENOMEM); + + memcpy(comp_array, lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + struct lov_user_md_v1 *v1; + struct lu_extent *ext; + + v1 = (struct lov_user_md *)((char *)comp_v1 + + comp_v1->lcm_entries[i].lcme_offset); + ext = &comp_v1->lcm_entries[i].lcme_extent; + + lod_comp = &comp_array[lo->ldo_comp_cnt + i]; + lod_comp->llc_extent.e_start = ext->e_start; + lod_comp->llc_extent.e_end = ext->e_end; + lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; + lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags; + + lod_comp->llc_stripe_count = v1->lmm_stripe_count; + lod_comp->llc_stripe_size = v1->lmm_stripe_size; + lod_adjust_stripe_info(lod_comp, desc); + + if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + v3 = (struct lov_user_md_v3 *) v1; + if (v3->lmm_pool_name[0] != '\0') { + rc = lod_set_pool(&lod_comp->llc_pool, + v3->lmm_pool_name); + if (rc) + GOTO(error, rc); + } + } + } + + old_array = lo->ldo_comp_entries; + old_array_cnt = lo->ldo_comp_cnt; + + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = array_cnt; + + /* No need to increase layout generation here, it will be increased + * later when generating component ID for the new components */ + + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + if (rc) { + lo->ldo_comp_entries = old_array; + lo->ldo_comp_cnt = old_array_cnt; + GOTO(error, rc); } + OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt); + + LASSERT(lo->ldo_mirror_count == 1); + lo->ldo_mirrors[0].lme_end = array_cnt - 1; + + RETURN(0); + +error: + for (i = lo->ldo_comp_cnt; i < array_cnt; i++) { + lod_comp = &comp_array[i]; + if (lod_comp->llc_pool != NULL) { + OBD_FREE(lod_comp->llc_pool, + strlen(lod_comp->llc_pool) + 1); + lod_comp->llc_pool = NULL; + } + } + OBD_FREE(comp_array, sizeof(*comp_array) * array_cnt); RETURN(rc); } /** - * Delete an extended attribute. - * - * Deletes specified xattr from the object and the stripes if the latter exist. + * Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field, + * the '$field' can only be 'flags' now. The xattr value is binary + * lov_comp_md_v1 which contains the component ID(s) and the value of + * the field to be modified. * * \param[in] env execution environment - * \param[in] dt object - * \param[in] name name of xattr - * \param[in] th transaction handle + * \param[in] dt dt_object to be modified + * \param[in] op operation string, like "set.flags" + * \param[in] buf buffer contains components to be set + * \parem[in] th thandle * - * \retval 0 on success - * \retval negative if failed + * \retval 0 on success + * \retval negative errno on failure */ -static int lod_xattr_del_internal(const struct lu_env *env, +static int lod_declare_layout_set(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th) + char *op, const struct lu_buf *buf, + struct thandle *th) { - struct dt_object *next = dt_object_child(dt); + struct lod_layout_component *lod_comp; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + __u32 magic; + int i, j, rc; + bool changed = false; ENTRY; - rc = lod_sub_object_xattr_del(env, next, name, th); - if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(rc); + if (strcmp(op, "set.flags") != 0) { + CDEBUG(D_LAYOUT, "%s: operation (%s) not supported.\n", + lod2obd(d)->obd_name, op); + RETURN(-ENOTSUPP); + } - if (lo->ldo_stripenr == 0) - RETURN(rc); + magic = comp_v1->lcm_magic; + if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + lustre_swab_lov_comp_md_v1(comp_v1); + magic = comp_v1->lcm_magic; + } - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); + if (magic != LOV_USER_MAGIC_COMP_V1) + RETURN(-EINVAL); - rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, - th); - if (rc != 0) - break; + if (comp_v1->lcm_entry_count == 0) { + CDEBUG(D_LAYOUT, "%s: entry count is zero.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + __u32 id = comp_v1->lcm_entries[i].lcme_id; + __u32 flags = comp_v1->lcm_entries[i].lcme_flags; + + if (flags & LCME_FL_INIT) { + if (changed) + lod_striping_free(env, lo); + RETURN(-EINVAL); + } + + for (j = 0; j < lo->ldo_comp_cnt; j++) { + lod_comp = &lo->ldo_comp_entries[j]; + if (id != lod_comp->llc_id) + continue; + + if (flags & LCME_FL_NEG) { + flags &= ~LCME_FL_NEG; + lod_comp->llc_flags &= ~flags; + } else { + lod_comp->llc_flags |= flags; + } + changed = true; + } } + if (!changed) { + CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + + lod_obj_inc_layout_gen(lo); + + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), &info->lti_buf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); RETURN(rc); } /** - * Set default striping on a directory. - * - * Sets specified striping on a directory object unless it matches the default - * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing - * EA. This striping will be used when regular file is being created in this - * directory. + * Declare component deletion. The xattr name is XATTR_LUSTRE_LOV.del, + * and the xattr value is a unique component ID or a special lcme_id. * * \param[in] env execution environment - * \param[in] dt the striped object - * \param[in] buf buffer with the striping - * \param[in] name name of EA - * \param[in] fl xattr flag (see OSD API description) - * \param[in] th transaction handle + * \param[in] dt dt_object to be operated on + * \param[in] buf buffer contains component ID or lcme_id + * \parem[in] th thandle * - * \retval 0 on success - * \retval negative if failed + * \retval 0 on success + * \retval negative errno on failure */ -static int lod_xattr_set_lov_on_dir(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +static int lod_declare_layout_del(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) { - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *l = lod_dt_obj(dt); - struct lov_user_md_v1 *lum; - struct lov_user_md_v3 *v3 = NULL; - const char *pool_name = NULL; - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + __u32 magic, id, flags, neg_flags = 0; + int rc, i, j, left; ENTRY; - /* If it is striped dir, we should clear the stripe cache for - * slave stripe as well, but there are no effective way to - * notify the LOD on the slave MDT, so we do not cache stripe - * information for slave stripe for now. XXX*/ - lod_lov_stripe_cache_clear(l); - LASSERT(buf != NULL && buf->lb_buf != NULL); - lum = buf->lb_buf; + LASSERT(lo->ldo_is_composite); - rc = lod_verify_striping(d, buf, false); - if (rc) - RETURN(rc); + if (lo->ldo_flr_state != LCM_FL_NONE) + RETURN(-EBUSY); - if (lum->lmm_magic == LOV_USER_MAGIC_V3) { - v3 = buf->lb_buf; - if (v3->lmm_pool_name[0] != '\0') - pool_name = v3->lmm_pool_name; + magic = comp_v1->lcm_magic; + if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + lustre_swab_lov_comp_md_v1(comp_v1); + magic = comp_v1->lcm_magic; } - /* if { size, offset, count } = { 0, -1, 0 } and no pool - * (i.e. all default values specified) then delete default - * striping from dir. */ - CDEBUG(D_OTHER, - "set default striping: sz %u # %u offset %d %s %s\n", - (unsigned)lum->lmm_stripe_size, - (unsigned)lum->lmm_stripe_count, - (int)lum->lmm_stripe_offset, - v3 ? "from" : "", v3 ? v3->lmm_pool_name : ""); + if (magic != LOV_USER_MAGIC_COMP_V1) + RETURN(-EINVAL); - if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count, - lum->lmm_stripe_offset, pool_name)) { - rc = lod_xattr_del_internal(env, dt, name, th); - if (rc == -ENODATA) - rc = 0; + id = comp_v1->lcm_entries[0].lcme_id; + flags = comp_v1->lcm_entries[0].lcme_flags; + + if (id > LCME_ID_MAX || (flags & ~LCME_KNOWN_FLAGS)) { + CDEBUG(D_LAYOUT, "%s: invalid component id %#x, flags %#x\n", + lod2obd(d)->obd_name, id, flags); + RETURN(-EINVAL); + } + + if (id != LCME_ID_INVAL && flags != 0) { + CDEBUG(D_LAYOUT, "%s: specified both id and flags.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + + if (id == LCME_ID_INVAL && !flags) { + CDEBUG(D_LAYOUT, "%s: no id or flags specified.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + + if (flags & LCME_FL_NEG) { + neg_flags = flags & ~LCME_FL_NEG; + flags = 0; + } + + left = lo->ldo_comp_cnt; + if (left <= 0) + RETURN(-EINVAL); + + for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { + struct lod_layout_component *lod_comp; + + lod_comp = &lo->ldo_comp_entries[i]; + + if (id != LCME_ID_INVAL && id != lod_comp->llc_id) + continue; + else if (flags && !(flags & lod_comp->llc_flags)) + continue; + else if (neg_flags && (neg_flags & lod_comp->llc_flags)) + continue; + + if (left != (i + 1)) { + CDEBUG(D_LAYOUT, "%s: this deletion will create " + "a hole.\n", lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + left--; + + /* Mark the component as deleted */ + lod_comp->llc_id = LCME_ID_INVAL; + + /* Not instantiated component */ + if (lod_comp->llc_stripe == NULL) + continue; + + LASSERT(lod_comp->llc_stripe_count > 0); + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + struct dt_object *obj = lod_comp->llc_stripe[j]; + + if (obj == NULL) + continue; + rc = lod_sub_declare_destroy(env, obj, th); + if (rc) + RETURN(rc); + } + } + + LASSERTF(left >= 0, "left = %d\n", left); + if (left == lo->ldo_comp_cnt) { + CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n", + lod2obd(d)->obd_name, id); + RETURN(-EINVAL); + } + + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_SIZE; + rc = lod_sub_declare_attr_set(env, next, attr, th); + if (rc) + RETURN(rc); + + if (left > 0) { + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); } else { - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); + rc = lod_sub_declare_xattr_del(env, next, XATTR_NAME_LOV, th); } RETURN(rc); } /** - * Set default striping on a directory object. + * Declare layout add/set/del operations issued by special xattr names: * - * Sets specified striping on a directory object unless it matches the default - * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing - * EA. This striping will be used when a new directory is being created in the - * directory. + * XATTR_LUSTRE_LOV.add add component(s) to existing file + * XATTR_LUSTRE_LOV.del delete component(s) from existing file + * XATTR_LUSTRE_LOV.set.$field set specified field of certain component(s) * * \param[in] env execution environment - * \param[in] dt the striped object - * \param[in] buf buffer with the striping - * \param[in] name name of EA - * \param[in] fl xattr flag (see OSD API description) + * \param[in] dt object + * \param[in] name name of xattr + * \param[in] buf lu_buf contains xattr value * \param[in] th transaction handle * * \retval 0 on success * \retval negative if failed */ -static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +static int lod_declare_modify_layout(const struct lu_env *env, + struct dt_object *dt, + const char *name, + const struct lu_buf *buf, + struct thandle *th) { - struct lod_object *l = lod_dt_obj(dt); - struct lmv_user_md_v1 *lum; - int rc; + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + char *op; + int rc, len = strlen(XATTR_LUSTRE_LOV); ENTRY; - LASSERT(buf != NULL && buf->lb_buf != NULL); - lum = buf->lb_buf; + LASSERT(dt_object_exists(dt)); - CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n", - le32_to_cpu(lum->lum_stripe_count), - (int)le32_to_cpu(lum->lum_stripe_offset)); + if (strlen(name) <= len || name[len] != '.') { + CDEBUG(D_LAYOUT, "%s: invalid xattr name: %s\n", + lod2obd(d)->obd_name, name); + RETURN(-EINVAL); + } + len++; - if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), - le32_to_cpu(lum->lum_stripe_offset)) && - le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { - rc = lod_xattr_del_internal(env, dt, name, th); - if (rc == -ENODATA) - rc = 0; - } else { - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); - if (rc != 0) - RETURN(rc); + rc = lod_striping_load(env, lo); + if (rc) + GOTO(unlock, rc); + + /* the layout to be modified must be a composite layout */ + if (!lo->ldo_is_composite) { + CDEBUG(D_LAYOUT, "%s: object "DFID" isn't a composite file.\n", + lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu))); + GOTO(unlock, rc = -EINVAL); } - /* Update default stripe cache */ - if (l->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(l->ldo_dir_stripe); - if (l->ldo_dir_stripe == NULL) - RETURN(-ENOMEM); + op = (char *)name + len; + if (strcmp(op, "add") == 0) { + rc = lod_declare_layout_add(env, dt, buf, th); + } else if (strcmp(op, "del") == 0) { + rc = lod_declare_layout_del(env, dt, buf, th); + } else if (strncmp(op, "set", strlen("set")) == 0) { + rc = lod_declare_layout_set(env, dt, op, buf, th); + } else { + CDEBUG(D_LAYOUT, "%s: unsupported xattr name:%s\n", + lod2obd(d)->obd_name, name); + GOTO(unlock, rc = -ENOTSUPP); } +unlock: + if (rc) + lod_striping_free(env, lo); - l->ldo_dir_def_striping_cached = 0; RETURN(rc); } /** - * Turn directory into a striped directory. - * - * During replay the client sends the striping created before MDT - * failure, then the layer above LOD sends this defined striping - * using ->do_xattr_set(), so LOD uses this method to replay creation - * of the stripes. Notice the original information for the striping - * (#stripes, FIDs, etc) was transferred in declare path. + * Convert a plain file lov_mds_md to a composite layout. * - * \param[in] env execution environment - * \param[in] dt the striped object - * \param[in] buf not used currently - * \param[in] name not used currently - * \param[in] fl xattr flag (see OSD API description) - * \param[in] th transaction handle + * \param[in,out] info the thread info::lti_ea_store buffer contains little + * endian plain file layout * - * \retval 0 on success - * \retval negative if failed + * \retval 0 on success, <0 on failure */ -static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, const char *name, - int fl, struct thandle *th) +static int lod_layout_convert(struct lod_thread_info *info) { - struct lod_object *lo = lod_dt_obj(dt); - struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *attr = &info->lti_attr; - struct dt_object_format *dof = &info->lti_format; - struct lu_buf lmv_buf; - struct lu_buf slave_lmv_buf; - struct lmv_mds_md_v1 *lmm; - struct lmv_mds_md_v1 *slave_lmm = NULL; - struct dt_insert_rec *rec = &info->lti_dt_rec; - int i; - int rc; + struct lov_mds_md *lmm = info->lti_ea_store; + struct lov_mds_md *lmm_save; + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_entry_v1 *lcme; + size_t size; + __u32 blob_size; + int rc = 0; ENTRY; - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); + /* realloc buffer to a composite layout which contains one component */ + blob_size = lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count), + le32_to_cpu(lmm->lmm_magic)); + size = sizeof(*lcm) + sizeof(*lcme) + blob_size; - /* The stripes are supposed to be allocated in declare phase, - * if there are no stripes being allocated, it will skip */ - if (lo->ldo_stripenr == 0) - RETURN(0); + OBD_ALLOC_LARGE(lmm_save, blob_size); + if (!lmm_save) + GOTO(out, rc = -ENOMEM); - rc = dt_attr_get(env, dt_object_child(dt), attr); - if (rc != 0) - RETURN(rc); + memcpy(lmm_save, lmm, blob_size); - attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | - LA_MODE | LA_UID | LA_GID | LA_TYPE; - dof->dof_type = DFT_DIR; + if (info->lti_ea_store_size < size) { + rc = lod_ea_store_resize(info, size); + if (rc) + GOTO(out, rc); + } - rc = lod_prep_lmv_md(env, dt, &lmv_buf); - if (rc != 0) + lcm = info->lti_ea_store; + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); + lcm->lcm_size = cpu_to_le32(size); + lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu( + lmm_save->lmm_layout_gen)); + lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE); + lcm->lcm_entry_count = cpu_to_le16(1); + lcm->lcm_mirror_count = 0; + + lcme = &lcm->lcm_entries[0]; + lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT); + lcme->lcme_extent.e_start = 0; + lcme->lcme_extent.e_end = cpu_to_le64(OBD_OBJECT_EOF); + lcme->lcme_offset = cpu_to_le32(sizeof(*lcm) + sizeof(*lcme)); + lcme->lcme_size = cpu_to_le32(blob_size); + + memcpy((char *)lcm + lcme->lcme_offset, (char *)lmm_save, blob_size); + + EXIT; +out: + if (lmm_save) + OBD_FREE_LARGE(lmm_save, blob_size); + return rc; +} + +/** + * Merge layouts to form a mirrored file. + */ +static int lod_declare_layout_merge(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *mbuf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf *buf = &info->lti_buf; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_v1 *cur_lcm; + struct lov_comp_md_v1 *merge_lcm; + struct lov_comp_md_entry_v1 *lcme; + size_t size = 0; + size_t offset; + __u16 cur_entry_count; + __u16 merge_entry_count; + __u32 id = 0; + __u16 mirror_id = 0; + __u32 mirror_count; + int rc, i; + ENTRY; + + merge_lcm = mbuf->lb_buf; + if (mbuf->lb_len < sizeof(*merge_lcm)) + RETURN(-EINVAL); + + /* must be an existing layout from disk */ + if (le32_to_cpu(merge_lcm->lcm_magic) != LOV_MAGIC_COMP_V1) + RETURN(-EINVAL); + + merge_entry_count = le16_to_cpu(merge_lcm->lcm_entry_count); + + /* do not allow to merge two mirrored files */ + if (le16_to_cpu(merge_lcm->lcm_mirror_count)) + RETURN(-EBUSY); + + /* verify the target buffer */ + rc = lod_get_lov_ea(env, lo); + if (rc <= 0) + RETURN(rc ? : -ENODATA); + + cur_lcm = info->lti_ea_store; + switch (le32_to_cpu(cur_lcm->lcm_magic)) { + case LOV_MAGIC_V1: + case LOV_MAGIC_V3: + rc = lod_layout_convert(info); + break; + case LOV_MAGIC_COMP_V1: + rc = 0; + break; + default: + rc = -EINVAL; + } + if (rc) RETURN(rc); - lmm = lmv_buf.lb_buf; - OBD_ALLOC_PTR(slave_lmm); - if (slave_lmm == NULL) + /* info->lti_ea_store could be reallocated in lod_layout_convert() */ + cur_lcm = info->lti_ea_store; + cur_entry_count = le16_to_cpu(cur_lcm->lcm_entry_count); + + /* 'lcm_mirror_count + 1' is the current # of mirrors the file has */ + mirror_count = le16_to_cpu(cur_lcm->lcm_mirror_count) + 1; + if (mirror_count + 1 > LUSTRE_MIRROR_COUNT_MAX) + RETURN(-ERANGE); + + /* size of new layout */ + size = le32_to_cpu(cur_lcm->lcm_size) + + le32_to_cpu(merge_lcm->lcm_size) - sizeof(*cur_lcm); + + memset(buf, 0, sizeof(*buf)); + lu_buf_alloc(buf, size); + if (buf->lb_buf == NULL) RETURN(-ENOMEM); - lod_prep_slave_lmv_md(slave_lmm, lmm); - slave_lmv_buf.lb_buf = slave_lmm; - slave_lmv_buf.lb_len = sizeof(*slave_lmm); + lcm = buf->lb_buf; + memcpy(lcm, cur_lcm, sizeof(*lcm) + cur_entry_count * sizeof(*lcme)); - rec->rec_type = S_IFDIR; - for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; + offset = sizeof(*lcm) + + sizeof(*lcme) * (cur_entry_count + merge_entry_count); + for (i = 0; i < cur_entry_count; i++) { + struct lov_comp_md_entry_v1 *cur_lcme; - dto = lo->ldo_stripe[i]; + lcme = &lcm->lcm_entries[i]; + cur_lcme = &cur_lcm->lcm_entries[i]; - dt_write_lock(env, dto, MOR_TGT_CHILD); - rc = lod_sub_object_create(env, dto, attr, NULL, dof, - th); - if (rc != 0) { - dt_write_unlock(env, dto); - GOTO(out, rc); + lcme->lcme_offset = cpu_to_le32(offset); + memcpy((char *)lcm + offset, + (char *)cur_lcm + le32_to_cpu(cur_lcme->lcme_offset), + le32_to_cpu(lcme->lcme_size)); + + offset += le32_to_cpu(lcme->lcme_size); + + if (mirror_count == 1) { + /* new mirrored file, create new mirror ID */ + id = pflr_id(1, i + 1); + lcme->lcme_id = cpu_to_le32(id); } - rc = lod_sub_object_ref_add(env, dto, th); - dt_write_unlock(env, dto); - if (rc != 0) - GOTO(out, rc); + id = MAX(le32_to_cpu(lcme->lcme_id), id); + } - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_object_index_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, 0); - if (rc != 0) - GOTO(out, rc); + mirror_id = mirror_id_of(id) + 1; + for (i = 0; i < merge_entry_count; i++) { + struct lov_comp_md_entry_v1 *merge_lcme; - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_object_index_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 0); - if (rc != 0) - GOTO(out, rc); + merge_lcme = &merge_lcm->lcm_entries[i]; + lcme = &lcm->lcm_entries[cur_entry_count + i]; - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || - cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && - cfs_fail_val == i) - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i + 1); - else - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i); + *lcme = *merge_lcme; + lcme->lcme_offset = cpu_to_le32(offset); - rc = lod_sub_object_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, fl, th); - if (rc != 0) - GOTO(out, rc); + id = pflr_id(mirror_id, i + 1); + lcme->lcme_id = cpu_to_le32(id); + + memcpy((char *)lcm + offset, + (char *)merge_lcm + le32_to_cpu(merge_lcme->lcme_offset), + le32_to_cpu(lcme->lcme_size)); + + offset += le32_to_cpu(lcme->lcme_size); + } + + /* fixup layout information */ + lod_obj_inc_layout_gen(lo); + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + lcm->lcm_size = cpu_to_le32(size); + lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count); + lcm->lcm_mirror_count = cpu_to_le16(mirror_count); + if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE) + lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY); + + rc = lod_striping_reload(env, lo, buf); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + +out: + lu_buf_free(buf); + RETURN(rc); +} + +/** + * Split layouts, just set the LOVEA with the layout from mbuf. + */ +static int lod_declare_layout_split(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *mbuf, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *lcm = mbuf->lb_buf; + int rc; + ENTRY; + + lod_obj_inc_layout_gen(lo); + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + + rc = lod_striping_reload(env, lo, mbuf); + if (rc) + RETURN(rc); + + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_declare_xattr_set. + * + * \see dt_object_operations::do_declare_xattr_set() in the API description + * for details. + * + * the extension to the API: + * - declaring LOVEA requests striping creation + * - LU_XATTR_REPLACE means layout swap + */ +static int lod_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + __u32 mode; + int rc; + ENTRY; + + mode = dt->do_lu.lo_header->loh_attr & S_IFMT; + if ((S_ISREG(mode) || mode == 0) && + !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0)) { + /* + * this is a request to create object's striping. + * + * allow to declare predefined striping on a new (!mode) object + * which is supposed to be replay of regular file creation + * (when LOV setting is declared) + * + * LU_XATTR_REPLACE is set to indicate a layout swap + */ + if (dt_object_exists(dt)) { + rc = dt_attr_get(env, next, attr); + if (rc) + RETURN(rc); + } else { + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_TYPE | LA_MODE; + attr->la_mode = S_IFREG; } + rc = lod_declare_striped_create(env, dt, attr, buf, th); + } else if (fl & LU_XATTR_MERGE) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_merge(env, dt, buf, th); + } else if (fl & LU_XATTR_SPLIT) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_split(env, dt, buf, th); + } else if (S_ISREG(mode) && + strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 && + strncmp(name, XATTR_LUSTRE_LOV, + strlen(XATTR_LUSTRE_LOV)) == 0) { + /* + * this is a request to modify object's striping. + * add/set/del component(s). + */ + if (!dt_object_exists(dt)) + RETURN(-ENOENT); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&dto->do_lu)), i); + rc = lod_declare_modify_layout(env, dt, name, buf, th); + } else if (S_ISDIR(mode)) { + rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); + } else if (strcmp(name, XATTR_NAME_FID) == 0) { + rc = lod_replace_parent_fid(env, dt, th, true); + } else { + rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); + } - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf); + RETURN(rc); +} + +/** + * Apply xattr changes to the object. + * + * Applies xattr changes to the object and the stripes if the latter exist. + * + * \param[in] env execution environment + * \param[in] dt object + * \param[in] buf buffer pointing to the new value of xattr + * \param[in] name name of xattr + * \param[in] fl flags + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_xattr_set_internal(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + /* Note: Do not set LinkEA on sub-stripes, otherwise + * it will confuse the fid2path process(see mdt_path_current()). + * The linkEA between master and sub-stripes is set in + * lod_xattr_set_lmv(). */ + if (lo->ldo_dir_stripe_count == 0 || strcmp(name, XATTR_NAME_LINK) == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + LASSERT(lo->ldo_stripe[i]); + + rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], buf, name, + fl, th); if (rc != 0) - GOTO(out, rc); + break; + } + + RETURN(rc); +} + +/** + * Delete an extended attribute. + * + * Deletes specified xattr from the object and the stripes if the latter exist. + * + * \param[in] env execution environment + * \param[in] dt object + * \param[in] name name of xattr + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_xattr_del_internal(const struct lu_env *env, + struct dt_object *dt, + const char *name, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = lod_sub_xattr_del(env, next, name, th); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + if (lo->ldo_dir_stripe_count == 0) + RETURN(rc); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + LASSERT(lo->ldo_stripe[i]); + + rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); + if (rc != 0) + break; + } + + RETURN(rc); +} + +/** + * Set default striping on a directory. + * + * Sets specified striping on a directory object unless it matches the default + * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing + * EA. This striping will be used when regular file is being created in this + * directory. + * + * \param[in] env execution environment + * \param[in] dt the striped object + * \param[in] buf buffer with the striping + * \param[in] name name of EA + * \param[in] fl xattr flag (see OSD API description) + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_xattr_set_lov_on_dir(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct lov_user_md_v1 *lum; + struct lov_user_md_v3 *v3 = NULL; + const char *pool_name = NULL; + int rc; + bool is_del; + ENTRY; + + LASSERT(buf != NULL && buf->lb_buf != NULL); + lum = buf->lb_buf; + + switch (lum->lmm_magic) { + case LOV_USER_MAGIC_SPECIFIC: + case LOV_USER_MAGIC_V3: + v3 = buf->lb_buf; + if (v3->lmm_pool_name[0] != '\0') + pool_name = v3->lmm_pool_name; + /* fall through */ + case LOV_USER_MAGIC_V1: + /* if { size, offset, count } = { 0, -1, 0 } and no pool + * (i.e. all default values specified) then delete default + * striping from dir. */ + CDEBUG(D_LAYOUT, + "set default striping: sz %u # %u offset %d %s %s\n", + (unsigned)lum->lmm_stripe_size, + (unsigned)lum->lmm_stripe_count, + (int)lum->lmm_stripe_offset, + v3 ? "from" : "", v3 ? v3->lmm_pool_name : ""); + + is_del = LOVEA_DELETE_VALUES(lum->lmm_stripe_size, + lum->lmm_stripe_count, + lum->lmm_stripe_offset, + pool_name); + break; + case LOV_USER_MAGIC_COMP_V1: + is_del = false; + break; + default: + CERROR("Invalid magic %x\n", lum->lmm_magic); + RETURN(-EINVAL); + } + + if (is_del) { + rc = lod_xattr_del_internal(env, dt, name, th); + if (rc == -ENODATA) + rc = 0; + } else { + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); + } + + RETURN(rc); +} + +/** + * Set default striping on a directory object. + * + * Sets specified striping on a directory object unless it matches the default + * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing + * EA. This striping will be used when a new directory is being created in the + * directory. + * + * \param[in] env execution environment + * \param[in] dt the striped object + * \param[in] buf buffer with the striping + * \param[in] name name of EA + * \param[in] fl xattr flag (see OSD API description) + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct lmv_user_md_v1 *lum; + int rc; + ENTRY; + + LASSERT(buf != NULL && buf->lb_buf != NULL); + lum = buf->lb_buf; + + CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n", + le32_to_cpu(lum->lum_stripe_count), + (int)le32_to_cpu(lum->lum_stripe_offset)); + + if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), + le32_to_cpu(lum->lum_stripe_offset)) && + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { + rc = lod_xattr_del_internal(env, dt, name, th); + if (rc == -ENODATA) + rc = 0; + } else { + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); + if (rc != 0) + RETURN(rc); + } + + RETURN(rc); +} + +/** + * Turn directory into a striped directory. + * + * During replay the client sends the striping created before MDT + * failure, then the layer above LOD sends this defined striping + * using ->do_xattr_set(), so LOD uses this method to replay creation + * of the stripes. Notice the original information for the striping + * (#stripes, FIDs, etc) was transferred in declare path. + * + * \param[in] env execution environment + * \param[in] dt the striped object + * \param[in] buf not used currently + * \param[in] name not used currently + * \param[in] fl xattr flag (see OSD API description) + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, + int fl, struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *attr = &info->lti_attr; + struct dt_object_format *dof = &info->lti_format; + struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; + int i; + int rc; + ENTRY; + + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(-ENOTDIR); + + /* The stripes are supposed to be allocated in declare phase, + * if there are no stripes being allocated, it will skip */ + if (lo->ldo_dir_stripe_count == 0) + RETURN(0); + + rc = dt_attr_get(env, dt_object_child(dt), attr); + if (rc != 0) + RETURN(rc); + + attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | + LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; + dof->dof_type = DFT_DIR; + + rc = lod_prep_lmv_md(env, dt, &lmv_buf); + if (rc != 0) + RETURN(rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + RETURN(-ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); + + rec->rec_type = S_IFDIR; + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + struct dt_object *dto; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + + dto = lo->ldo_stripe[i]; + + dt_write_lock(env, dto, MOR_TGT_CHILD); + rc = lod_sub_create(env, dto, attr, NULL, dof, th); + if (rc != 0) { + dt_write_unlock(env, dto); + GOTO(out, rc); + } + + rc = lod_sub_ref_add(env, dto, th); + dt_write_unlock(env, dto); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, 0); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, 0); + if (rc != 0) + GOTO(out, rc); + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + cfs_fail_val != i) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + cfs_fail_val == i) + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i + 1); + else + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i); + + rc = lod_sub_xattr_set(env, dto, &slave_lmv_buf, + XATTR_NAME_LMV, fl, th); + if (rc != 0) + GOTO(out, rc); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, th, 0); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_ref_add(env, dt_object_child(dt), th); + if (rc != 0) + GOTO(out, rc); + } + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) + rc = lod_sub_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, fl, th); +out: + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); + + RETURN(rc); +} + +/** + * Helper function to declare/execute creation of a striped directory + * + * Called in declare/create object path, prepare striping for a directory + * and prepare defaults data striping for the objects to be created in + * that directory. Notice the function calls "declaration" or "execution" + * methods depending on \a declare param. This is a consequence of the + * current approach while we don't have natural distributed transactions: + * we basically execute non-local updates in the declare phase. So, the + * arguments for the both phases are the same and this is the reason for + * this function to exist. + * + * \param[in] env execution environment + * \param[in] dt object + * \param[in] attr attributes the stripes will be created with + * \param[in] lmu lmv_user_md if MDT indices are specified + * \param[in] dof format of stripes (see OSD API description) + * \param[in] th transaction handle + * \param[in] declare where to call "declare" or "execute" methods + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_striping_create_internal(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + const struct lu_buf *lmu, + struct dt_object_format *dof, + struct thandle *th, + bool declare) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + const struct lod_default_striping *lds = lo->ldo_def_striping; + int rc; + ENTRY; + + LASSERT(ergo(lds != NULL, + lds->lds_def_striping_set || + lds->lds_dir_def_striping_set)); + + if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count, + lo->ldo_dir_stripe_offset)) { + if (!lmu) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + int stripe_count = lo->ldo_dir_stripe_count; + + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) + RETURN(rc); + v1 = info->lti_ea_store; + } + + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = cpu_to_le32(stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lo->ldo_dir_stripe_offset); + + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + lmu = &info->lti_buf; + } + + if (declare) + rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof, + th); + else + rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0, + th); + if (rc != 0) + RETURN(rc); + } + + /* Transfer default LMV striping from the parent */ + if (lds != NULL && lds->lds_dir_def_striping_set && + !LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count, + lds->lds_dir_def_stripe_offset)) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) + RETURN(rc); + v1 = info->lti_ea_store; + } + + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = + cpu_to_le32(lds->lds_dir_def_stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lds->lds_dir_def_stripe_offset); + v1->lum_hash_type = + cpu_to_le32(lds->lds_dir_def_hash_type); + + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + if (declare) + rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, + XATTR_NAME_DEFAULT_LMV, + 0, th); + else + rc = lod_xattr_set_default_lmv_on_dir(env, dt, + &info->lti_buf, + XATTR_NAME_DEFAULT_LMV, 0, + th); + if (rc != 0) + RETURN(rc); + } + + /* Transfer default LOV striping from the parent */ + if (lds != NULL && lds->lds_def_striping_set && + lds->lds_def_comp_cnt != 0) { + struct lov_mds_md *lmm; + int lmm_size = lod_comp_md_size(lo, true); + + if (info->lti_ea_store_size < lmm_size) { + rc = lod_ea_store_resize(info, lmm_size); + if (rc != 0) + RETURN(rc); + } + lmm = info->lti_ea_store; + + rc = lod_generate_lovea(env, lo, lmm, &lmm_size, true); + if (rc != 0) + RETURN(rc); + + info->lti_buf.lb_buf = lmm; + info->lti_buf.lb_len = lmm_size; + + if (declare) + rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + else + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + if (rc != 0) + RETURN(rc); + } + + RETURN(0); +} + +static int lod_declare_dir_striping_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct lu_buf *lmu, + struct dt_object_format *dof, + struct thandle *th) +{ + return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th, + true); +} + +static int lod_dir_striping_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th, + false); +} + +/** + * Make LOV EA for striped object. + * + * Generate striping information and store it in the LOV EA of the given + * object. The caller must ensure nobody else is calling the function + * against the object concurrently. The transaction must be started. + * FLDB service must be running as well; it's used to map FID to the target, + * which is stored in LOV EA. + * + * \param[in] env execution environment for this thread + * \param[in] lo LOD object + * \param[in] th transaction handle + * + * \retval 0 if LOV EA is stored successfully + * \retval negative error number on failure + */ +static int lod_generate_and_set_lovea(const struct lu_env *env, + struct lod_object *lo, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(&lo->ldo_obj); + struct lov_mds_md_v1 *lmm; + int rc, lmm_size; + ENTRY; + + LASSERT(lo); + + if (lo->ldo_comp_cnt == 0) { + lod_striping_free(env, lo); + rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); + RETURN(rc); + } + + lmm_size = lod_comp_md_size(lo, false); + if (info->lti_ea_store_size < lmm_size) { + rc = lod_ea_store_resize(info, lmm_size); + if (rc) + RETURN(rc); + } + lmm = info->lti_ea_store; + + rc = lod_generate_lovea(env, lo, lmm, &lmm_size, false); + if (rc) + RETURN(rc); + + info->lti_buf.lb_buf = lmm; + info->lti_buf.lb_len = lmm_size; + rc = lod_sub_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + RETURN(rc); +} + +/** + * Delete layout component(s) + * + * \param[in] env execution environment for this thread + * \param[in] dt object + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative error number on failure + */ +static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct lod_layout_component *lod_comp; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + int rc, i, j, left; + + LASSERT(lo->ldo_is_composite); + LASSERT(lo->ldo_comp_cnt > 0 && lo->ldo_comp_entries != NULL); + + left = lo->ldo_comp_cnt; + for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_id != LCME_ID_INVAL) + break; + left--; + + /* Not instantiated component */ + if (lod_comp->llc_stripe == NULL) + continue; + + LASSERT(lod_comp->llc_stripe_count > 0); + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + struct dt_object *obj = lod_comp->llc_stripe[j]; + + if (obj == NULL) + continue; + rc = lod_sub_destroy(env, obj, th); + if (rc) + GOTO(out, rc); + + lu_object_put(env, &obj->do_lu); + lod_comp->llc_stripe[j] = NULL; + } + OBD_FREE(lod_comp->llc_stripe, sizeof(struct dt_object *) * + lod_comp->llc_stripes_allocated); + lod_comp->llc_stripe = NULL; + OBD_FREE(lod_comp->llc_ost_indices, + sizeof(__u32) * lod_comp->llc_stripes_allocated); + lod_comp->llc_ost_indices = NULL; + lod_comp->llc_stripes_allocated = 0; + lod_obj_set_pool(lo, i, NULL); + if (lod_comp->llc_ostlist.op_array) { + OBD_FREE(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + lod_comp->llc_ostlist.op_array = NULL; + lod_comp->llc_ostlist.op_size = 0; + } + } + + LASSERTF(left >= 0 && left < lo->ldo_comp_cnt, "left = %d\n", left); + if (left > 0) { + struct lod_layout_component *comp_array; + + OBD_ALLOC(comp_array, sizeof(*comp_array) * left); + if (comp_array == NULL) + GOTO(out, rc = -ENOMEM); + + memcpy(&comp_array[0], &lo->ldo_comp_entries[0], + sizeof(*comp_array) * left); + + OBD_FREE(lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = left; + + LASSERT(lo->ldo_mirror_count == 1); + lo->ldo_mirrors[0].lme_end = left - 1; + lod_obj_inc_layout_gen(lo); + } else { + lod_free_comp_entries(lo); + } + + LASSERT(dt_object_exists(dt)); + rc = dt_attr_get(env, next, attr); + if (rc) + GOTO(out, rc); + + if (attr->la_size > 0) { + attr->la_size = 0; + attr->la_valid = LA_SIZE; + rc = lod_sub_attr_set(env, next, attr, th); + if (rc) + GOTO(out, rc); + } + + rc = lod_generate_and_set_lovea(env, lo, th); + EXIT; +out: + if (rc) + lod_striping_free(env, lo); + return rc; +} + + +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds); +/** + * Implementation of dt_object_operations::do_xattr_set. + * + * Sets specified extended attribute on the object. Three types of EAs are + * special: + * LOV EA - stores striping for a regular file or default striping (when set + * on a directory) + * LMV EA - stores a marker for the striped directories + * DMV EA - stores default directory striping + * + * When striping is applied to a non-striped existing object (this is called + * late striping), then LOD notices the caller wants to turn the object into a + * striped one. The stripe objects are created and appropriate EA is set: + * LOV EA storing all the stripes directly or LMV EA storing just a small header + * with striping configuration. + * + * \see dt_object_operations::do_xattr_set() in the API description for details. + */ +static int lod_xattr_set(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + const char *name, int fl, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_LMV) == 0) { + struct lmv_mds_md_v1 *lmm = buf->lb_buf; + + if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & + LMV_HASH_FLAG_MIGRATION) + rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + else + rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + + RETURN(rc); + } + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_LOV) == 0) { + struct lod_thread_info *info = lod_env_info(env); + struct lod_default_striping *lds = &info->lti_def_striping; + struct lov_user_md_v1 *v1 = buf->lb_buf; + char pool[LOV_MAXPOOLNAME + 1]; + bool is_del; + + /* get existing striping config */ + rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds); + if (rc) + RETURN(rc); + + memset(pool, 0, sizeof(pool)); + if (lds->lds_def_striping_set == 1) + lod_layout_get_pool(lds->lds_def_comp_entries, + lds->lds_def_comp_cnt, pool, + sizeof(pool)); + + is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, + v1->lmm_stripe_count, + v1->lmm_stripe_offset, + NULL); + + /* Retain the pool name if it is not given */ + if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && + !is_del) { + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v3 *v3 = info->lti_ea_store; + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); + v3->lmm_stripe_count = + cpu_to_le32(v1->lmm_stripe_count); + v3->lmm_stripe_offset = + cpu_to_le32(v1->lmm_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); + + strlcpy(v3->lmm_pool_name, pool, + sizeof(v3->lmm_pool_name)); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + name, fl, th); + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, + fl, th); + } + + if (lds->lds_def_striping_set == 1 && + lds->lds_def_comp_entries != NULL) + lod_free_def_comp_entries(lds); + + RETURN(rc); + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + /* default LMVEA */ + rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, + th); + RETURN(rc); + } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && + (!strcmp(name, XATTR_NAME_LOV) || + !strncmp(name, XATTR_LUSTRE_LOV, + strlen(XATTR_LUSTRE_LOV)))) { + /* in case of lov EA swap, just set it + * if not, it is a replay so check striping match what we + * already have during req replay, declare_xattr_set() + * defines striping, then create() does the work */ + if (fl & LU_XATTR_REPLACE) { + /* free stripes, then update disk */ + lod_striping_free(env, lod_dt_obj(dt)); + + rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + } else if (dt_object_remote(dt)) { + /* This only happens during migration, see + * mdd_migrate_create(), in which Master MDT will + * create a remote target object, and only set + * (migrating) stripe EA on the remote object, + * and does not need creating each stripes. */ + rc = lod_sub_xattr_set(env, next, buf, name, + fl, th); + } else if (strcmp(name, XATTR_LUSTRE_LOV".del") == 0) { + /* delete component(s) */ + LASSERT(lod_dt_obj(dt)->ldo_comp_cached); + rc = lod_layout_del(env, dt, th); + } else { + /* + * When 'name' is XATTR_LUSTRE_LOV or XATTR_NAME_LOV, + * it's going to create create file with specified + * component(s), the striping must have not being + * cached in this case; + * + * Otherwise, it's going to add/change component(s) to + * an existing file, the striping must have been cached + * in this case. + */ + LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || + !strcmp(name, XATTR_NAME_LOV), + !lod_dt_obj(dt)->ldo_comp_cached)); + + rc = lod_striped_create(env, dt, NULL, NULL, th); + } + RETURN(rc); + } else if (strcmp(name, XATTR_NAME_FID) == 0) { + rc = lod_replace_parent_fid(env, dt, th, false); + + RETURN(rc); + } + + /* then all other xattr */ + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); + + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_declare_xattr_del. + * + * \see dt_object_operations::do_declare_xattr_del() in the API description + * for details. + */ +static int lod_declare_xattr_del(const struct lu_env *env, + struct dt_object *dt, const char *name, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th); + if (rc != 0) + RETURN(rc); + + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(0); + + /* set xattr to each stripes, if needed */ + rc = lod_striping_load(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_dir_stripe_count == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i], + name, th); + if (rc != 0) + break; + } + + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_xattr_del. + * + * If EA storing a regular striping is being deleted, then release + * all the references to the stripe objects in core. + * + * \see dt_object_operations::do_xattr_del() in the API description for details. + */ +static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + if (!strcmp(name, XATTR_NAME_LOV)) + lod_striping_free(env, lod_dt_obj(dt)); + + rc = lod_sub_xattr_del(env, next, name, th); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + if (lo->ldo_dir_stripe_count == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + LASSERT(lo->ldo_stripe[i]); + + rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); + if (rc != 0) + break; + } + + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_xattr_list. + * + * \see dt_object_operations::do_xattr_list() in the API description + * for details. + */ +static int lod_xattr_list(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf) +{ + return dt_xattr_list(env, dt_object_child(dt), buf); +} + +static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid) +{ + return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); +} + +/** + * Copy OST list from layout provided by user. + * + * \param[in] lod_comp layout_component to be filled + * \param[in] v3 LOV EA V3 user data + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp, + struct lov_user_md_v3 *v3) +{ + int j; + + ENTRY; + + if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT) + v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx; + + if (lod_comp->llc_ostlist.op_array) { + if (lod_comp->llc_ostlist.op_count == + v3->lmm_stripe_count) + goto skip; + OBD_FREE(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + } + + /* copy ost list from lmm */ + lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count; + lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32); + OBD_ALLOC(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + if (!lod_comp->llc_ostlist.op_array) + RETURN(-ENOMEM); +skip: + for (j = 0; j < v3->lmm_stripe_count; j++) { + lod_comp->llc_ostlist.op_array[j] = + v3->lmm_objects[j].l_ost_idx; + } + + RETURN(0); +} + + +/** + * Get default striping. + * + * \param[in] env execution environment + * \param[in] lo object + * \param[out] lds default striping + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v1 *v1 = NULL; + struct lov_user_md_v3 *v3 = NULL; + struct lov_comp_md_v1 *comp_v1 = NULL; + __u16 comp_cnt; + __u16 mirror_cnt; + bool composite; + int rc, i; + ENTRY; + + lds->lds_def_striping_set = 0; + + rc = lod_get_lov_ea(env, lo); + if (rc < 0) + RETURN(rc); + + if (rc < (typeof(rc))sizeof(struct lov_user_md)) + RETURN(0); + + v1 = info->lti_ea_store; + if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { + lustre_swab_lov_user_md_v1(v1); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { + v3 = (struct lov_user_md_v3 *)v1; + lustre_swab_lov_user_md_v3(v3); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)v1; + lustre_swab_lov_user_md_v3(v3); + lustre_swab_lov_user_md_objects(v3->lmm_objects, + v3->lmm_stripe_count); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + comp_v1 = (struct lov_comp_md_v1 *)v1; + lustre_swab_lov_comp_md_v1(comp_v1); + } + + if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && + v1->lmm_magic != LOV_MAGIC_COMP_V1 && + v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) + RETURN(-ENOTSUPP); + + if (v1->lmm_magic == LOV_MAGIC_COMP_V1) { + comp_v1 = (struct lov_comp_md_v1 *)v1; + comp_cnt = comp_v1->lcm_entry_count; + if (comp_cnt == 0) + RETURN(-EINVAL); + mirror_cnt = comp_v1->lcm_mirror_count + 1; + composite = true; + } else { + comp_cnt = 1; + mirror_cnt = 0; + composite = false; + } + + /* realloc default comp entries if necessary */ + rc = lod_def_striping_comp_resize(lds, comp_cnt); + if (rc < 0) + RETURN(rc); + + lds->lds_def_comp_cnt = comp_cnt; + lds->lds_def_striping_is_composite = composite; + lds->lds_def_mirror_cnt = mirror_cnt; + + for (i = 0; i < comp_cnt; i++) { + struct lod_layout_component *lod_comp; + struct lu_extent *ext; + char *pool; + + lod_comp = &lds->lds_def_comp_entries[i]; + /* + * reset lod_comp values, llc_stripes is always NULL in + * the default striping template, llc_pool will be reset + * later below. + */ + memset(lod_comp, 0, offsetof(typeof(*lod_comp), llc_pool)); + + if (composite) { + v1 = (struct lov_user_md *)((char *)comp_v1 + + comp_v1->lcm_entries[i].lcme_offset); + ext = &comp_v1->lcm_entries[i].lcme_extent; + lod_comp->llc_extent = *ext; + } + + if (v1->lmm_pattern != LOV_PATTERN_RAID0 && + v1->lmm_pattern != LOV_PATTERN_MDT && + v1->lmm_pattern != 0) { + lod_free_def_comp_entries(lds); + RETURN(-EINVAL); + } + + CDEBUG(D_LAYOUT, DFID" stripe_count=%d stripe_size=%d " + "stripe_offset=%d\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), + (int)v1->lmm_stripe_count, (int)v1->lmm_stripe_size, + (int)v1->lmm_stripe_offset); + + lod_comp->llc_stripe_count = v1->lmm_stripe_count; + lod_comp->llc_stripe_size = v1->lmm_stripe_size; + lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; + lod_comp->llc_pattern = v1->lmm_pattern; + + pool = NULL; + if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + /* XXX: sanity check here */ + v3 = (struct lov_user_md_v3 *) v1; + if (v3->lmm_pool_name[0] != '\0') + pool = v3->lmm_pool_name; + } + lod_set_def_pool(lds, i, pool); + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + v3 = (struct lov_user_md_v3 *)v1; + rc = lod_comp_copy_ost_lists(lod_comp, v3); + if (rc) + RETURN(rc); + } + } + + lds->lds_def_striping_set = 1; + RETURN(rc); +} + +/** + * Get default directory striping. + * + * \param[in] env execution environment + * \param[in] lo object + * \param[out] lds default striping + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_get_default_lmv_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lmv_user_md_v1 *v1 = NULL; + int rc; + ENTRY; + + lds->lds_dir_def_striping_set = 0; + rc = lod_get_default_lmv_ea(env, lo); + if (rc < 0) + RETURN(rc); + + if (rc < (typeof(rc))sizeof(struct lmv_user_md)) + RETURN(0); + + v1 = info->lti_ea_store; + + lds->lds_dir_def_stripe_count = le32_to_cpu(v1->lum_stripe_count); + lds->lds_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); + lds->lds_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); + lds->lds_dir_def_striping_set = 1; + + RETURN(0); +} + +/** + * Get default striping in the object. + * + * Get object default striping and default directory striping. + * + * \param[in] env execution environment + * \param[in] lo object + * \param[out] lds default striping + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_get_default_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) +{ + int rc, rc1; + + rc = lod_get_default_lov_striping(env, lo, lds); + rc1 = lod_get_default_lmv_striping(env, lo, lds); + if (rc == 0 && rc1 < 0) + rc = rc1; + + return rc; +} + +/** + * Apply default striping on object. + * + * If object striping pattern is not set, set to the one in default striping. + * The default striping is from parent or fs. + * + * \param[in] lo new object + * \param[in] lds default striping + * \param[in] mode new object's mode + */ +static void lod_striping_from_default(struct lod_object *lo, + const struct lod_default_striping *lds, + umode_t mode) +{ + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lov_desc *desc = &d->lod_desc; + int i, rc; + + if (lds->lds_def_striping_set && S_ISREG(mode)) { + rc = lod_alloc_comp_entries(lo, lds->lds_def_mirror_cnt, + lds->lds_def_comp_cnt); + if (rc != 0) + return; + + lo->ldo_is_composite = lds->lds_def_striping_is_composite; + if (lds->lds_def_mirror_cnt > 1) + lo->ldo_flr_state = LCM_FL_RDONLY; + + for (i = 0; i < lo->ldo_comp_cnt; i++) { + struct lod_layout_component *obj_comp = + &lo->ldo_comp_entries[i]; + struct lod_layout_component *def_comp = + &lds->lds_def_comp_entries[i]; + + CDEBUG(D_LAYOUT, "Inherite from default: size:%hu " + "nr:%u offset:%u pattern %#x %s\n", + def_comp->llc_stripe_size, + def_comp->llc_stripe_count, + def_comp->llc_stripe_offset, + def_comp->llc_pattern, + def_comp->llc_pool ?: ""); + + *obj_comp = *def_comp; + if (def_comp->llc_pool != NULL) { + /* pointer was copied from def_comp */ + obj_comp->llc_pool = NULL; + lod_obj_set_pool(lo, i, def_comp->llc_pool); + } + + /* copy ost list */ + if (def_comp->llc_ostlist.op_array) { + OBD_ALLOC(obj_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + if (!obj_comp->llc_ostlist.op_array) + return; + memcpy(obj_comp->llc_ostlist.op_array, + def_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + } + + /* + * Don't initialize these fields for plain layout + * (v1/v3) here, they are inherited in the order of + * 'parent' -> 'fs default (root)' -> 'global default + * values for stripe_count & stripe_size'. + * + * see lod_ah_init(). + */ + if (!lo->ldo_is_composite) + continue; + + lod_adjust_stripe_info(obj_comp, desc); + } + } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { + if (lo->ldo_dir_stripe_count == 0) + lo->ldo_dir_stripe_count = + lds->lds_dir_def_stripe_count; + if (lo->ldo_dir_stripe_offset == -1) + lo->ldo_dir_stripe_offset = + lds->lds_dir_def_stripe_offset; + if (lo->ldo_dir_hash_type == 0) + lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type; + + CDEBUG(D_LAYOUT, "striping from default dir: count:%hu, " + "offset:%u, hash_type:%u\n", + lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset, + lo->ldo_dir_hash_type); + } +} + +static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root) +{ + struct lod_layout_component *lod_comp; + + if (lo->ldo_comp_cnt == 0) + return true; + + if (lo->ldo_is_composite) + return false; + + lod_comp = &lo->ldo_comp_entries[0]; + + if (lod_comp->llc_stripe_count <= 0 || + lod_comp->llc_stripe_size <= 0) + return true; + + if (from_root && (lod_comp->llc_pool == NULL || + lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)) + return true; + + return false; +} + +/** + * Implementation of dt_object_operations::do_ah_init. + * + * This method is used to make a decision on the striping configuration for the + * object being created. It can be taken from the \a parent object if it exists, + * or filesystem's default. The resulting configuration (number of stripes, + * stripe size/offset, pool name, etc) is stored in the object itself and will + * be used by the methods like ->doo_declare_create(). + * + * \see dt_object_operations::do_ah_init() in the API description for details. + */ +static void lod_ah_init(const struct lu_env *env, + struct dt_allocation_hint *ah, + struct dt_object *parent, + struct dt_object *child, + umode_t child_mode) +{ + struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); + struct lod_thread_info *info = lod_env_info(env); + struct lod_default_striping *lds = &info->lti_def_striping; + struct dt_object *nextp = NULL; + struct dt_object *nextc; + struct lod_object *lp = NULL; + struct lod_object *lc; + struct lov_desc *desc; + struct lod_layout_component *lod_comp; + int rc; + ENTRY; + + LASSERT(child); + + if (likely(parent)) { + nextp = dt_object_child(parent); + lp = lod_dt_obj(parent); + } + + nextc = dt_object_child(child); + lc = lod_dt_obj(child); + + LASSERT(!lod_obj_is_striped(child)); + /* default layout template may have been set on the regular file + * when this is called from mdd_create_data() */ + if (S_ISREG(child_mode)) + lod_free_comp_entries(lc); + + if (!dt_object_exists(nextc)) + nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); + + if (S_ISDIR(child_mode)) { + const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; + + /* other default values are 0 */ + lc->ldo_dir_stripe_offset = -1; + + /* get default striping from parent object */ + if (likely(lp != NULL)) + lod_get_default_striping(env, lp, lds); + + /* set child default striping info, default value is NULL */ + if (lds->lds_def_striping_set || lds->lds_dir_def_striping_set) + lc->ldo_def_striping = lds; + + /* It should always honour the specified stripes */ + /* Note: old client (< 2.7)might also do lfs mkdir, whose EA + * will have old magic. In this case, we should ignore the + * stripe count and try to create dir by default stripe. + */ + if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) { + lc->ldo_dir_stripe_count = + le32_to_cpu(lum1->lum_stripe_count); + lc->ldo_dir_stripe_offset = + le32_to_cpu(lum1->lum_stripe_offset); + lc->ldo_dir_hash_type = + le32_to_cpu(lum1->lum_hash_type); + CDEBUG(D_INFO, + "set dirstripe: count %hu, offset %d, hash %u\n", + lc->ldo_dir_stripe_count, + (int)lc->ldo_dir_stripe_offset, + lc->ldo_dir_hash_type); + } else { + /* transfer defaults LMV to new directory */ + lod_striping_from_default(lc, lds, child_mode); + } + + /* shrink the stripe_count to the avaible MDT count */ + if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 && + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) + lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1; + + /* Directory will be striped only if stripe_count > 1, if + * stripe_count == 1, let's reset stripe_count = 0 to avoid + * create single master stripe and also help to unify the + * stripe handling of directories and files */ + if (lc->ldo_dir_stripe_count == 1) + lc->ldo_dir_stripe_count = 0; + + CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", + lc->ldo_dir_stripe_count, + (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); + + RETURN_EXIT; + } + + /* child object regular file*/ + + if (!lod_object_will_be_striped(S_ISREG(child_mode), + lu_object_fid(&child->do_lu))) + RETURN_EXIT; + + /* If object is going to be striped over OSTs, transfer default + * striping information to the child, so that we can use it + * during declaration and creation. + * + * Try from the parent first. + */ + if (likely(lp != NULL)) { + rc = lod_get_default_lov_striping(env, lp, lds); + if (rc == 0) + lod_striping_from_default(lc, lds, child_mode); + } + + /* Initialize lod_device::lod_md_root object reference */ + if (d->lod_md_root == NULL) { + struct dt_object *root; + struct lod_object *lroot; + + lu_root_fid(&info->lti_fid); + root = dt_locate(env, &d->lod_dt_dev, &info->lti_fid); + if (!IS_ERR(root)) { + lroot = lod_dt_obj(root); + + spin_lock(&d->lod_lock); + if (d->lod_md_root != NULL) + dt_object_put(env, &d->lod_md_root->ldo_obj); + d->lod_md_root = lroot; + spin_unlock(&d->lod_lock); + } + } + + /* try inherit layout from the root object (fs default) when: + * - parent does not have default layout; or + * - parent has plain(v1/v3) default layout, and some attributes + * are not specified in the default layout; + */ + if (d->lod_md_root != NULL && lod_need_inherit_more(lc, true)) { + rc = lod_get_default_lov_striping(env, d->lod_md_root, lds); + if (rc) + goto out; + if (lc->ldo_comp_cnt == 0) { + lod_striping_from_default(lc, lds, child_mode); + } else if (!lds->lds_def_striping_is_composite) { + struct lod_layout_component *def_comp; + + LASSERT(!lc->ldo_is_composite); + lod_comp = &lc->ldo_comp_entries[0]; + def_comp = &lds->lds_def_comp_entries[0]; + + if (lod_comp->llc_stripe_count <= 0) + lod_comp->llc_stripe_count = + def_comp->llc_stripe_count; + if (lod_comp->llc_stripe_size <= 0) + lod_comp->llc_stripe_size = + def_comp->llc_stripe_size; + if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) + lod_comp->llc_stripe_offset = + def_comp->llc_stripe_offset; + if (lod_comp->llc_pool == NULL) + lod_obj_set_pool(lc, 0, def_comp->llc_pool); + } + } +out: + /* + * fs default striping may not be explicitly set, or historically set + * in config log, use them. + */ + if (lod_need_inherit_more(lc, false)) { + if (lc->ldo_comp_cnt == 0) { + rc = lod_alloc_comp_entries(lc, 0, 1); + if (rc) + /* fail to allocate memory, will create a + * non-striped file. */ + RETURN_EXIT; + lc->ldo_is_composite = 0; + lod_comp = &lc->ldo_comp_entries[0]; + lod_comp->llc_stripe_offset = LOV_OFFSET_DEFAULT; + } + LASSERT(!lc->ldo_is_composite); + lod_comp = &lc->ldo_comp_entries[0]; + desc = &d->lod_desc; + lod_adjust_stripe_info(lod_comp, desc); + } + + EXIT; +} + +#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) +/** + * Size initialization on late striping. + * + * Propagate the size of a truncated object to a deferred striping. + * This function handles a special case when truncate was done on a + * non-striped object and now while the striping is being created + * we can't lose that size, so we have to propagate it to the stripes + * being created. + * + * \param[in] env execution environment + * \param[in] dt object + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_declare_init_size(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object **objects = NULL; + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + uint64_t size, offs; + int i, rc, stripe, stripe_count = 0, stripe_size = 0; + struct lu_extent size_ext; + ENTRY; + + if (!lod_obj_is_striped(dt)) + RETURN(0); + + rc = dt_attr_get(env, next, attr); + LASSERT(attr->la_valid & LA_SIZE); + if (rc) + RETURN(rc); + + size = attr->la_size; + if (size == 0) + RETURN(0); + + size_ext = (typeof(size_ext)){ .e_start = size - 1, .e_end = size }; + for (i = 0; i < lo->ldo_comp_cnt; i++) { + struct lod_layout_component *lod_comp; + struct lu_extent *extent; + + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_stripe == NULL) + continue; + + extent = &lod_comp->llc_extent; + CDEBUG(D_INFO, "%lld "DEXT"\n", size, PEXT(extent)); + if (!lo->ldo_is_composite || + lu_extent_is_overlapped(extent, &size_ext)) { + objects = lod_comp->llc_stripe; + stripe_count = lod_comp->llc_stripe_count; + stripe_size = lod_comp->llc_stripe_size; + + /* next mirror */ + if (stripe_count == 0) + continue; + + LASSERT(objects != NULL && stripe_size != 0); + /* ll_do_div64(a, b) returns a % b, and a = a / b */ + ll_do_div64(size, (__u64)stripe_size); + stripe = ll_do_div64(size, (__u64)stripe_count); + LASSERT(objects[stripe] != NULL); + + size = size * stripe_size; + offs = attr->la_size; + size += ll_do_div64(offs, stripe_size); + + attr->la_valid = LA_SIZE; + attr->la_size = size; + + rc = lod_sub_declare_attr_set(env, objects[stripe], + attr, th); + } + } + + RETURN(rc); +} + +/** + * Declare creation of striped object. + * + * The function declares creation stripes for a regular object. The function + * also declares whether the stripes will be created with non-zero size if + * previously size was set non-zero on the master object. If object \a dt is + * not local, then only fully defined striping can be applied in \a lovea. + * Otherwise \a lovea can be in the form of pattern, see lod_qos_parse_config() + * for the details. + * + * \param[in] env execution environment + * \param[in] dt object + * \param[in] attr attributes the stripes will be created with + * \param[in] lovea a buffer containing striping description + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_declare_striped_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + const struct lu_buf *lovea, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) + GOTO(out, rc = -ENOMEM); + + if (!dt_object_remote(next)) { + /* choose OST and generate appropriate objects */ + rc = lod_prepare_create(env, lo, attr, lovea, th); + if (rc) + GOTO(out, rc); + + /* + * declare storage for striping data + */ + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + } else { + /* LOD can not choose OST objects for remote objects, i.e. + * stripes must be ready before that. Right now, it can only + * happen during migrate, i.e. migrate process needs to create + * remote regular file (mdd_migrate_create), then the migrate + * process will provide stripeEA. */ + LASSERT(lovea != NULL); + info->lti_buf = *lovea; + } + + rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + if (rc) + GOTO(out, rc); + + /* + * if striping is created with local object's size > 0, + * we have to propagate this size to specific object + * the case is possible only when local object was created previously + */ + if (dt_object_exists(next)) + rc = lod_declare_init_size(env, dt, th); + +out: + /* failed to create striping or to set initial size, let's reset + * config so that others don't get confused */ + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_declare_create. + * + * The method declares creation of a new object. If the object will be striped, + * then helper functions are called to find FIDs for the stripes, declare + * creation of the stripes and declare initialization of the striping + * information to be stored in the master object. + * + * \see dt_object_operations::do_declare_create() in the API description + * for details. + */ +static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + LASSERT(dof); + LASSERT(attr); + LASSERT(th); + + /* + * first of all, we declare creation of local object + */ + rc = lod_sub_declare_create(env, next, attr, hint, dof, th); + if (rc != 0) + GOTO(out, rc); + + /* + * it's lod_ah_init() that has decided the object will be striped + */ + if (dof->dof_type == DFT_REGULAR) { + /* callers don't want stripes */ + /* XXX: all tricky interactions with ->ah_make_hint() decided + * to use striping, then ->declare_create() behaving differently + * should be cleaned */ + if (dof->u.dof_reg.striped != 0) + rc = lod_declare_striped_create(env, dt, attr, + NULL, th); + } else if (dof->dof_type == DFT_DIR) { + struct seq_server_site *ss; + struct lu_buf buf = { NULL }; + struct lu_buf *lmu = NULL; + + ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); + + /* If the parent has default stripeEA, and client + * did not find it before sending create request, + * then MDT will return -EREMOTE, and client will + * retrieve the default stripeEA and re-create the + * sub directory. + * + * Note: if dah_eadata != NULL, it means creating the + * striped directory with specified stripeEA, then it + * should ignore the default stripeEA */ + if (hint != NULL && hint->dah_eadata == NULL) { + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) + GOTO(out, rc = -EREMOTE); + + if (lo->ldo_dir_stripe_offset == -1) { + /* child and parent should be in the same MDT */ + if (hint->dah_parent != NULL && + dt_object_remote(hint->dah_parent)) + GOTO(out, rc = -EREMOTE); + } else if (lo->ldo_dir_stripe_offset != + ss->ss_node_id) { + struct lod_device *lod; + struct lod_tgt_descs *ltd; + struct lod_tgt_desc *tgt = NULL; + bool found_mdt = false; + int i; + + lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + ltd = &lod->lod_mdt_descs; + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + tgt = LTD_TGT(ltd, i); + if (tgt->ltd_index == + lo->ldo_dir_stripe_offset) { + found_mdt = true; + break; + } + } + + /* If the MDT indicated by stripe_offset can be + * found, then tell client to resend the create + * request to the correct MDT, otherwise return + * error to client */ + if (found_mdt) + GOTO(out, rc = -EREMOTE); + else + GOTO(out, rc = -EINVAL); + } + } else if (hint && hint->dah_eadata) { + lmu = &buf; + lmu->lb_buf = (void *)hint->dah_eadata; + lmu->lb_len = hint->dah_eadata_len; + } + + rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof, + th); + } +out: + /* failed to create striping or to set initial size, let's reset + * config so that others don't get confused */ + if (rc) + lod_striping_free(env, lo); + RETURN(rc); +} - rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); +/** + * Generate component ID for new created component. + * + * \param[in] lo LOD object + * \param[in] comp_idx index of ldo_comp_entries + * + * \retval component ID on success + * \retval LCME_ID_INVAL on failure + */ +static __u32 lod_gen_component_id(struct lod_object *lo, + int mirror_id, int comp_idx) +{ + struct lod_layout_component *lod_comp; + __u32 id, start, end; + int i; - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_object_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); + LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_object_index_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th, 0); - if (rc != 0) - GOTO(out, rc); + lod_obj_inc_layout_gen(lo); + id = lo->ldo_layout_gen; + if (likely(id <= SEQ_ID_MAX)) + RETURN(pflr_id(mirror_id, id & SEQ_ID_MASK)); - rc = lod_sub_object_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out, rc); + /* Layout generation wraps, need to check collisions. */ + start = id & SEQ_ID_MASK; + end = SEQ_ID_MAX; +again: + for (id = start; id <= end; id++) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (pflr_id(mirror_id, id) == lod_comp->llc_id) + break; + } + /* Found the ununsed ID */ + if (i == lo->ldo_comp_cnt) + RETURN(pflr_id(mirror_id, id)); + } + if (end == LCME_ID_MAX) { + start = 1; + end = min(lo->ldo_layout_gen & LCME_ID_MASK, + (__u32)(LCME_ID_MAX - 1)); + goto again; } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) - rc = lod_sub_object_xattr_set(env, dt_object_child(dt), - &lmv_buf, XATTR_NAME_LMV, fl, th); -out: - if (slave_lmm != NULL) - OBD_FREE_PTR(slave_lmm); - - RETURN(rc); + RETURN(LCME_ID_INVAL); } /** - * Helper function to declare/execute creation of a striped directory + * Creation of a striped regular object. * - * Called in declare/create object path, prepare striping for a directory - * and prepare defaults data striping for the objects to be created in - * that directory. Notice the function calls "declaration" or "execution" - * methods depending on \a declare param. This is a consequence of the - * current approach while we don't have natural distributed transactions: - * we basically execute non-local updates in the declare phase. So, the - * arguments for the both phases are the same and this is the reason for - * this function to exist. + * The function is called to create the stripe objects for a regular + * striped file. This can happen at the initial object creation or + * when the caller asks LOD to do so using ->do_xattr_set() method + * (so called late striping). Notice all the information are already + * prepared in the form of the list of objects (ldo_stripe field). + * This is done during declare phase. * * \param[in] env execution environment * \param[in] dt object * \param[in] attr attributes the stripes will be created with * \param[in] dof format of stripes (see OSD API description) * \param[in] th transaction handle - * \param[in] declare where to call "declare" or "execute" methods * * \retval 0 on success * \retval negative if failed */ -static int lod_dir_striping_create_internal(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th, - bool declare) +int lod_striped_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_object_format *dof, + struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *lod_comp; struct lod_object *lo = lod_dt_obj(dt); - int rc; + __u16 mirror_id; + int rc = 0, i, j; ENTRY; - if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr, - lo->ldo_dir_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int stripe_count = lo->ldo_stripenr; - - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; - } - - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_stripe_offset); + LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); + mirror_id = lo->ldo_mirror_count > 1 ? 1 : 0; - if (declare) - rc = lod_declare_xattr_set_lmv(env, dt, attr, - &info->lti_buf, dof, th); - else - rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, - XATTR_NAME_LMV, 0, th); - if (rc != 0) - RETURN(rc); - } + /* create all underlying objects */ + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; - /* Transfer default LMV striping from the parent */ - if (lo->ldo_dir_def_striping_set && - !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, - lo->ldo_dir_def_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int def_stripe_count = lo->ldo_dir_def_stripenr; + if (lod_comp->llc_extent.e_start == 0 && i > 0) /* new mirror */ + ++mirror_id; - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; + if (lod_comp->llc_id == LCME_ID_INVAL) { + lod_comp->llc_id = lod_gen_component_id(lo, + mirror_id, i); + if (lod_comp->llc_id == LCME_ID_INVAL) + GOTO(out, rc = -ERANGE); } - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(def_stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_def_stripe_offset); - v1->lum_hash_type = - cpu_to_le32(lo->ldo_dir_def_hash_type); - - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); - if (declare) - rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, - XATTR_NAME_DEFAULT_LMV, - 0, th); - else - rc = lod_xattr_set_default_lmv_on_dir(env, dt, - &info->lti_buf, - XATTR_NAME_DEFAULT_LMV, 0, - th); - if (rc != 0) - RETURN(rc); - } + if (lod_comp_inited(lod_comp)) + continue; - /* Transfer default LOV striping from the parent */ - if (lo->ldo_def_striping_set && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { - struct lov_user_md_v3 *v3 = info->lti_ea_store; - - if (info->lti_ea_store_size < sizeof(*v3)) { - rc = lod_ea_store_resize(info, sizeof(*v3)); - if (rc != 0) - RETURN(rc); - v3 = info->lti_ea_store; - } + if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) + lod_comp_set_init(lod_comp); - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); + if (lov_pattern(lod_comp->llc_pattern) == LOV_PATTERN_MDT) + lod_comp_set_init(lod_comp); - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); + if (lod_comp->llc_stripe == NULL) + continue; - if (declare) - rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, - XATTR_NAME_LOV, 0, th); - else - rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, - XATTR_NAME_LOV, 0, th); - if (rc != 0) - RETURN(rc); + LASSERT(lod_comp->llc_stripe_count); + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + struct dt_object *object = lod_comp->llc_stripe[j]; + LASSERT(object != NULL); + rc = lod_sub_create(env, object, attr, NULL, dof, th); + if (rc) + GOTO(out, rc); + } + lod_comp_set_init(lod_comp); } - RETURN(0); -} - -static int lod_declare_dir_striping_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th) -{ - return lod_dir_striping_create_internal(env, dt, attr, dof, th, true); -} + rc = lod_fill_mirrors(lo); + if (rc) + GOTO(out, rc); -static int lod_dir_striping_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th) -{ - struct lod_object *lo = lod_dt_obj(dt); - int rc; + rc = lod_generate_and_set_lovea(env, lo, th); + if (rc) + GOTO(out, rc); - rc = lod_dir_striping_create_internal(env, dt, attr, dof, th, false); - if (rc == 0) - lo->ldo_striping_cached = 1; + lo->ldo_comp_cached = 1; + RETURN(0); - return rc; +out: + lod_striping_free(env, lo); + RETURN(rc); } /** - * Implementation of dt_object_operations::do_xattr_set. - * - * Sets specified extended attribute on the object. Three types of EAs are - * special: - * LOV EA - stores striping for a regular file or default striping (when set - * on a directory) - * LMV EA - stores a marker for the striped directories - * DMV EA - stores default directory striping + * Implementation of dt_object_operations::do_create. * - * When striping is applied to a non-striped existing object (this is called - * late striping), then LOD notices the caller wants to turn the object into a - * striped one. The stripe objects are created and appropriate EA is set: - * LOV EA storing all the stripes directly or LMV EA storing just a small header - * with striping configuration. + * If any of preceeding methods (like ->do_declare_create(), + * ->do_ah_init(), etc) chose to create a striped object, + * then this method will create the master and the stripes. * - * \see dt_object_operations::do_xattr_set() in the API description for details. + * \see dt_object_operations::do_create() in the API description for details. */ -static int lod_xattr_set(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *buf, - const char *name, int fl, struct thandle *th) +static int lod_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - int rc; + int rc; ENTRY; - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strcmp(name, XATTR_NAME_LMV) == 0) { - struct lmv_mds_md_v1 *lmm = buf->lb_buf; - - if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & - LMV_HASH_FLAG_MIGRATION) - rc = lod_sub_object_xattr_set(env, next, buf, name, fl, - th); - else - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); - + /* create local object */ + rc = lod_sub_create(env, dt_object_child(dt), attr, hint, dof, th); + if (rc != 0) RETURN(rc); + + if (S_ISREG(dt->do_lu.lo_header->loh_attr) && + lod_obj_is_striped(dt) && dof->u.dof_reg.striped != 0) { + LASSERT(lod_dt_obj(dt)->ldo_comp_cached == 0); + rc = lod_striped_create(env, dt, attr, dof, th); } - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strcmp(name, XATTR_NAME_LOV) == 0) { - /* default LOVEA */ - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); - RETURN(rc); - } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - /* default LMVEA */ - rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, - th); - RETURN(rc); - } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - !strcmp(name, XATTR_NAME_LOV)) { - /* in case of lov EA swap, just set it - * if not, it is a replay so check striping match what we - * already have during req replay, declare_xattr_set() - * defines striping, then create() does the work */ - if (fl & LU_XATTR_REPLACE) { - /* free stripes, then update disk */ - lod_object_free_striping(env, lod_dt_obj(dt)); + RETURN(rc); +} - rc = lod_sub_object_xattr_set(env, next, buf, name, - fl, th); - } else if (dt_object_remote(dt)) { - /* This only happens during migration, see - * mdd_migrate_create(), in which Master MDT will - * create a remote target object, and only set - * (migrating) stripe EA on the remote object, - * and does not need creating each stripes. */ - rc = lod_sub_object_xattr_set(env, next, buf, name, - fl, th); - } else { - rc = lod_striping_create(env, dt, NULL, NULL, th); - } - RETURN(rc); - } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_object_replace_parent_fid(env, dt, th, false); +static inline int +lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) +{ + if (data->locd_declare) + return lod_sub_declare_destroy(env, dt, th); + else if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + stripe_idx == cfs_fail_val) + return lod_sub_destroy(env, dt, th); + else + return 0; +} + +/** + * Implementation of dt_object_operations::do_declare_destroy. + * + * If the object is a striped directory, then the function declares reference + * removal from the master object (this is an index) to the stripes and declares + * destroy of all the stripes. In all the cases, it declares an intention to + * destroy the object itself. + * + * \see dt_object_operations::do_declare_destroy() in the API description + * for details. + */ +static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + char *stripe_name = info->lti_key; + int rc, i; + ENTRY; + /* + * load striping information, notice we don't do this when object + * is being initialized as we don't need this information till + * few specific cases like destroy, chown + */ + rc = lod_striping_load(env, lo); + if (rc) RETURN(rc); - } - /* then all other xattr */ - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); + /* declare destroy for all underlying objects */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); - RETURN(rc); -} + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + rc = lod_sub_declare_ref_del(env, next, th); + if (rc != 0) + RETURN(rc); -/** - * Implementation of dt_object_operations::do_declare_xattr_del. - * - * \see dt_object_operations::do_declare_xattr_del() in the API description - * for details. - */ -static int lod_declare_xattr_del(const struct lu_env *env, - struct dt_object *dt, const char *name, - struct thandle *th) -{ - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; - ENTRY; + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), + i); + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + RETURN(rc); + } + } - rc = lod_sub_object_declare_xattr_del(env, dt_object_child(dt), - name, th); - if (rc != 0) + /* + * we declare destroy for the local object + */ + rc = lod_sub_declare_destroy(env, next, th); + if (rc) RETURN(rc); - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); - /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); - if (rc != 0) - RETURN(rc); - - if (lo->ldo_stripenr == 0) + if (!lod_obj_is_striped(dt)) RETURN(0); - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i], - name, th); - if (rc != 0) - break; + /* declare destroy all striped objects */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (lo->ldo_stripe[i] == NULL) + continue; + + rc = lod_sub_declare_ref_del(env, lo->ldo_stripe[i], + th); + + rc = lod_sub_declare_destroy(env, lo->ldo_stripe[i], + th); + if (rc != 0) + break; + } + } else { + struct lod_obj_stripe_cb_data data = { { 0 } }; + + data.locd_declare = true; + data.locd_stripe_cb = lod_obj_stripe_destroy_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } RETURN(rc); } /** - * Implementation of dt_object_operations::do_xattr_del. + * Implementation of dt_object_operations::do_destroy. * - * If EA storing a regular striping is being deleted, then release - * all the references to the stripe objects in core. + * If the object is a striped directory, then the function removes references + * from the master object (this is an index) to the stripes and destroys all + * the stripes. In all the cases, the function destroys the object itself. * - * \see dt_object_operations::do_xattr_del() in the API description for details. + * \see dt_object_operations::do_destroy() in the API description for details. */ -static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th) +static int lod_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + char *stripe_name = info->lti_key; + unsigned int i; + int rc; ENTRY; - if (!strcmp(name, XATTR_NAME_LOV)) - lod_object_free_striping(env, lod_dt_obj(dt)); + /* destroy sub-stripe of master object */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); - rc = lod_sub_object_xattr_del(env, next, name, th); - if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + rc = lod_sub_ref_del(env, next, th); + if (rc != 0) + RETURN(rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), + i); + + CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n", + PFID(lu_object_fid(&dt->do_lu)), stripe_name, + PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); + + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + RETURN(rc); + } + } + + rc = lod_sub_destroy(env, next, th); + if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); + if (!lod_obj_is_striped(dt)) + RETURN(0); - rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th); - if (rc != 0) - break; + /* destroy all striped objects */ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (lo->ldo_stripe[i] == NULL) + continue; + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + i == cfs_fail_val) { + dt_write_lock(env, lo->ldo_stripe[i], + MOR_TGT_CHILD); + rc = lod_sub_ref_del(env, lo->ldo_stripe[i], + th); + dt_write_unlock(env, lo->ldo_stripe[i]); + if (rc != 0) + break; + + rc = lod_sub_destroy(env, lo->ldo_stripe[i], + th); + if (rc != 0) + break; + } + } + } else { + struct lod_obj_stripe_cb_data data = { { 0 } }; + + data.locd_declare = false; + data.locd_stripe_cb = lod_obj_stripe_destroy_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } RETURN(rc); } /** - * Implementation of dt_object_operations::do_xattr_list. + * Implementation of dt_object_operations::do_declare_ref_add. * - * \see dt_object_operations::do_xattr_list() in the API description + * \see dt_object_operations::do_declare_ref_add() in the API description * for details. */ -static int lod_xattr_list(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *buf) +static int lod_declare_ref_add(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) { - return dt_xattr_list(env, dt_object_child(dt), buf); + return lod_sub_declare_ref_add(env, dt_object_child(dt), th); } /** - * Initialize a pool the object belongs to. - * - * When a striped object is being created, striping configuration - * may demand the stripes are allocated on a limited set of the - * targets. These limited sets are known as "pools". So we copy - * a pool name into the object and later actual creation methods - * (like lod_object_create()) will use this information to allocate - * the stripes properly. + * Implementation of dt_object_operations::do_ref_add. * - * \param[in] o object - * \param[in] pool pool name + * \see dt_object_operations::do_ref_add() in the API description for details. */ -int lod_object_set_pool(struct lod_object *o, char *pool) +static int lod_ref_add(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) { - int len; - - if (o->ldo_pool) { - len = strlen(o->ldo_pool); - OBD_FREE(o->ldo_pool, len + 1); - o->ldo_pool = NULL; - } - if (pool) { - len = strlen(pool); - OBD_ALLOC(o->ldo_pool, len + 1); - if (o->ldo_pool == NULL) - return -ENOMEM; - strcpy(o->ldo_pool, pool); - } - return 0; + return lod_sub_ref_add(env, dt_object_child(dt), th); } -static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid) +/** + * Implementation of dt_object_operations::do_declare_ref_del. + * + * \see dt_object_operations::do_declare_ref_del() in the API description + * for details. + */ +static int lod_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) { - return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); + return lod_sub_declare_ref_del(env, dt_object_child(dt), th); } - /** - * Cache default regular striping in the object. - * - * To improve performance of striped regular object creation we cache - * default LOV striping (if it exists) in the parent directory object. - * - * \param[in] env execution environment - * \param[in] lp object + * Implementation of dt_object_operations::do_ref_del * - * \retval 0 on success - * \retval negative if failed + * \see dt_object_operations::do_ref_del() in the API description for details. */ -static int lod_cache_parent_lov_striping(const struct lu_env *env, - struct lod_object *lp) +static int lod_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - struct lov_user_md_v1 *v1 = NULL; - struct lov_user_md_v3 *v3 = NULL; - int rc; - ENTRY; - - /* called from MDD without parent being write locked, - * lock it here */ - dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0); - rc = lod_get_lov_ea(env, lp); - if (rc < 0) - GOTO(unlock, rc); - - if (rc < (typeof(rc))sizeof(struct lov_user_md)) { - /* don't lookup for non-existing or invalid striping */ - lp->ldo_def_striping_set = 0; - lp->ldo_def_striping_cached = 1; - lp->ldo_def_stripe_size = 0; - lp->ldo_def_stripenr = 0; - lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1); - GOTO(unlock, rc = 0); - } - - rc = 0; - v1 = info->lti_ea_store; - if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { - lustre_swab_lov_user_md_v1(v1); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { - v3 = (struct lov_user_md_v3 *)v1; - lustre_swab_lov_user_md_v3(v3); - } - - if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1) - GOTO(unlock, rc = 0); - - if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0) - GOTO(unlock, rc = 0); - - CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n", - PFID(lu_object_fid(&lp->ldo_obj.do_lu)), - (int)v1->lmm_stripe_count, - (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset); - - lp->ldo_def_stripenr = v1->lmm_stripe_count; - lp->ldo_def_stripe_size = v1->lmm_stripe_size; - lp->ldo_def_stripe_offset = v1->lmm_stripe_offset; - lp->ldo_def_striping_cached = 1; - lp->ldo_def_striping_set = 1; - if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - /* XXX: sanity check here */ - v3 = (struct lov_user_md_v3 *) v1; - if (v3->lmm_pool_name[0]) - lod_object_set_pool(lp, v3->lmm_pool_name); - } - EXIT; -unlock: - dt_write_unlock(env, dt_object_child(&lp->ldo_obj)); - return rc; + return lod_sub_ref_del(env, dt_object_child(dt), th); } - /** - * Cache default directory striping in the object. - * - * To improve performance of striped directory creation we cache default - * directory striping (if it exists) in the parent directory object. - * - * \param[in] env execution environment - * \param[in] lp object + * Implementation of dt_object_operations::do_object_sync. * - * \retval 0 on success - * \retval negative if failed + * \see dt_object_operations::do_object_sync() in the API description + * for details. */ -static int lod_cache_parent_lmv_striping(const struct lu_env *env, - struct lod_object *lp) +static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end) { - struct lod_thread_info *info = lod_env_info(env); - struct lmv_user_md_v1 *v1 = NULL; - int rc; - ENTRY; - - /* called from MDD without parent being write locked, - * lock it here */ - dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0); - rc = lod_get_default_lmv_ea(env, lp); - if (rc < 0) - GOTO(unlock, rc); - - if (rc < (typeof(rc))sizeof(struct lmv_user_md)) { - /* don't lookup for non-existing or invalid striping */ - lp->ldo_dir_def_striping_set = 0; - lp->ldo_dir_def_striping_cached = 1; - lp->ldo_dir_def_stripenr = 0; - lp->ldo_dir_def_stripe_offset = - (typeof(v1->lum_stripe_offset))(-1); - lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64; - GOTO(unlock, rc = 0); - } - - rc = 0; - v1 = info->lti_ea_store; - - lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count); - lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); - lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); - lp->ldo_dir_def_striping_set = 1; - lp->ldo_dir_def_striping_cached = 1; - - EXIT; -unlock: - dt_write_unlock(env, dt_object_child(&lp->ldo_obj)); - return rc; + return dt_object_sync(env, dt_object_child(dt), start, end); } /** - * Cache default striping in the object. - * - * To improve performance of striped object creation we cache default striping - * (if it exists) in the parent directory object. We always cache default - * striping for the regular files (stored in LOV EA) and we cache default - * striping for the directories if requested by \a child_mode (when a new - * directory is being created). + * Implementation of dt_object_operations::do_object_unlock. * - * \param[in] env execution environment - * \param[in] lp object - * \param[in] child_mode new object's mode + * Used to release LDLM lock(s). * - * \retval 0 on success - * \retval negative if failed + * \see dt_object_operations::do_object_unlock() in the API description + * for details. */ -static int lod_cache_parent_striping(const struct lu_env *env, - struct lod_object *lp, - umode_t child_mode) +static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, + struct ldlm_enqueue_info *einfo, + union ldlm_policy_data *policy) { - int rc = 0; + struct lod_object *lo = lod_dt_obj(dt); + struct lustre_handle_array *slave_locks = einfo->ei_cbdata; + int slave_locks_size; + int i; ENTRY; - if (!lp->ldo_def_striping_cached) { - /* we haven't tried to get default striping for - * the directory yet, let's cache it in the object */ - rc = lod_cache_parent_lov_striping(env, lp); - if (rc != 0) - RETURN(rc); - } + if (slave_locks == NULL) + RETURN(0); - /* If the parent is on the remote MDT, we should always - * try to refresh the default stripeEA cache, because we - * do not cache default striping information for remote - * object. */ - if (S_ISDIR(child_mode) && (!lp->ldo_dir_def_striping_cached || - dt_object_remote(&lp->ldo_obj))) - rc = lod_cache_parent_lmv_striping(env, lp); + LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); + LASSERT(lo->ldo_dir_stripe_count > 1); + /* Note: for remote lock for single stripe dir, MDT will cancel + * the lock by lockh directly */ + LASSERT(!dt_object_remote(dt_object_child(dt))); - RETURN(rc); + /* locks were unlocked in MDT layer */ + for (i = 0; i < slave_locks->ha_count; i++) + LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i])); + + /* + * NB, ha_count may not equal to ldo_dir_stripe_count, because dir + * layout may change, e.g., shrink dir layout after migration. + */ + for (i = 0; i < lo->ldo_dir_stripe_count; i++) + dt_invalidate(env, lo->ldo_stripe[i]); + + slave_locks_size = offsetof(typeof(*slave_locks), + ha_handles[slave_locks->ha_count]); + OBD_FREE(slave_locks, slave_locks_size); + einfo->ei_cbdata = NULL; + + RETURN(0); } /** - * Implementation of dt_object_operations::do_ah_init. + * Implementation of dt_object_operations::do_object_lock. * - * This method is used to make a decision on the striping configuration for the - * object being created. It can be taken from the \a parent object if it exists, - * or filesystem's default. The resulting configuration (number of stripes, - * stripe size/offset, pool name, etc) is stored in the object itself and will - * be used by the methods like ->doo_declare_create(). + * Used to get LDLM lock on the non-striped and striped objects. * - * \see dt_object_operations::do_ah_init() in the API description for details. + * \see dt_object_operations::do_object_lock() in the API description + * for details. */ -static void lod_ah_init(const struct lu_env *env, - struct dt_allocation_hint *ah, - struct dt_object *parent, - struct dt_object *child, - umode_t child_mode) +static int lod_object_lock(const struct lu_env *env, + struct dt_object *dt, + struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + union ldlm_policy_data *policy) { - struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); - struct dt_object *nextp = NULL; - struct dt_object *nextc; - struct lod_object *lp = NULL; - struct lod_object *lc; - struct lov_desc *desc; - int rc; + struct lod_object *lo = lod_dt_obj(dt); + int slave_locks_size; + struct lustre_handle_array *slave_locks = NULL; + int i; + int rc; ENTRY; - LASSERT(child); - - if (likely(parent)) { - nextp = dt_object_child(parent); - lp = lod_dt_obj(parent); - rc = lod_load_striping(env, lp); - if (rc != 0) - return; + /* remote object lock */ + if (!einfo->ei_enq_slave) { + LASSERT(dt_object_remote(dt)); + return dt_object_lock(env, dt_object_child(dt), lh, einfo, + policy); } - nextc = dt_object_child(child); - lc = lod_dt_obj(child); - - LASSERT(lc->ldo_stripenr == 0); - LASSERT(lc->ldo_stripe == NULL); - - if (!dt_object_exists(nextc)) - nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(-ENOTDIR); - if (S_ISDIR(child_mode)) { - if (lc->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(lc->ldo_dir_stripe); - if (lc->ldo_dir_stripe == NULL) - return; - } + rc = lod_striping_load(env, lo); + if (rc != 0) + RETURN(rc); - LASSERT(lp != NULL); - if (lp->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(lp->ldo_dir_stripe); - if (lp->ldo_dir_stripe == NULL) - return; - } + /* No stripes */ + if (lo->ldo_dir_stripe_count <= 1) + RETURN(0); - rc = lod_cache_parent_striping(env, lp, child_mode); - if (rc != 0) - return; + slave_locks_size = offsetof(typeof(*slave_locks), + ha_handles[lo->ldo_dir_stripe_count]); + /* Freed in lod_object_unlock */ + OBD_ALLOC(slave_locks, slave_locks_size); + if (!slave_locks) + RETURN(-ENOMEM); + slave_locks->ha_count = lo->ldo_dir_stripe_count; - /* transfer defaults to new directory */ - if (lp->ldo_def_striping_set) { - if (lp->ldo_pool) - lod_object_set_pool(lc, lp->ldo_pool); - lc->ldo_def_stripenr = lp->ldo_def_stripenr; - lc->ldo_def_stripe_size = lp->ldo_def_stripe_size; - lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; - lc->ldo_def_striping_set = 1; - lc->ldo_def_striping_cached = 1; - CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", - (int)lc->ldo_def_stripe_size, - (int)lc->ldo_def_stripe_offset, - (int)lc->ldo_def_stripenr); - } + /* striped directory lock */ + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + struct lustre_handle lockh; + struct ldlm_res_id *res_id; - /* transfer dir defaults to new directory */ - if (lp->ldo_dir_def_striping_set) { - lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr; - lc->ldo_dir_def_stripe_offset = - lp->ldo_dir_def_stripe_offset; - lc->ldo_dir_def_hash_type = - lp->ldo_dir_def_hash_type; - lc->ldo_dir_def_striping_set = 1; - lc->ldo_dir_def_striping_cached = 1; - CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n", - (int)lc->ldo_dir_def_stripenr, - (int)lc->ldo_dir_def_stripe_offset, - lc->ldo_dir_def_hash_type); - } + res_id = &lod_env_info(env)->lti_res_id; + fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu), + res_id); + einfo->ei_res_id = res_id; - /* It should always honour the specified stripes */ - if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) { - const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - - rc = lod_verify_md_striping(d, lum1); - if (rc == 0 && - le32_to_cpu(lum1->lum_stripe_count) > 1) { - lc->ldo_stripenr = - le32_to_cpu(lum1->lum_stripe_count); - lc->ldo_dir_stripe_offset = - le32_to_cpu(lum1->lum_stripe_offset); - lc->ldo_dir_hash_type = - le32_to_cpu(lum1->lum_hash_type); - CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n", - lc->ldo_stripenr, - (int)lc->ldo_dir_stripe_offset); - } - /* then check whether there is default stripes from parent */ - } else if (lp->ldo_dir_def_striping_set) { - /* If there are default dir stripe from parent */ - lc->ldo_stripenr = lp->ldo_dir_def_stripenr; - lc->ldo_dir_stripe_offset = - lp->ldo_dir_def_stripe_offset; - lc->ldo_dir_hash_type = - lp->ldo_dir_def_hash_type; - CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n", - lc->ldo_stripenr, - (int)lc->ldo_dir_stripe_offset); + LASSERT(lo->ldo_stripe[i] != NULL); + if (dt_object_remote(lo->ldo_stripe[i])) { + set_bit(i, (void *)slave_locks->ha_map); + rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, + einfo, policy); } else { - /* set default stripe for this directory */ - lc->ldo_stripenr = 0; - lc->ldo_dir_stripe_offset = -1; - } - - /* shrink the stripe_count to the avaible MDT count */ - if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) - lc->ldo_stripenr = d->lod_remote_mdt_count + 1; - - /* Directory will be striped only if stripe_count > 1, if - * stripe_count == 1, let's reset stripenr = 0 to avoid - * create single master stripe and also help to unify the - * stripe handling of directories and files */ - if (lc->ldo_stripenr == 1) - lc->ldo_stripenr = 0; - - CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n", - lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset); + struct ldlm_namespace *ns = einfo->ei_namespace; + ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; + ldlm_completion_callback completion = einfo->ei_cb_cp; + __u64 dlmflags = LDLM_FL_ATOMIC_CB; - goto out; - } + if (einfo->ei_mode == LCK_PW || + einfo->ei_mode == LCK_EX) + dlmflags |= LDLM_FL_COS_INCOMPAT; - /* - * if object is going to be striped over OSTs, transfer default - * striping information to the child, so that we can use it - * during declaration and creation - */ - if (!lod_object_will_be_striped(S_ISREG(child_mode), - lu_object_fid(&child->do_lu))) - goto out; - /* - * try from the parent - */ - if (likely(parent)) { - lod_cache_parent_striping(env, lp, child_mode); - - lc->ldo_def_stripe_offset = LOV_OFFSET_DEFAULT; - - if (lp->ldo_def_striping_set) { - if (lp->ldo_pool) - lod_object_set_pool(lc, lp->ldo_pool); - lc->ldo_stripenr = lp->ldo_def_stripenr; - lc->ldo_stripe_size = lp->ldo_def_stripe_size; - lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; - CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n", - lc->ldo_stripenr, lc->ldo_stripe_size, - lp->ldo_pool ? lp->ldo_pool : ""); + LASSERT(ns != NULL); + rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, + policy, einfo->ei_mode, + &dlmflags, blocking, + completion, NULL, + NULL, 0, LVB_T_NONE, + NULL, &lockh); + } + if (rc) { + while (i--) + ldlm_lock_decref_and_cancel( + &slave_locks->ha_handles[i], + einfo->ei_mode); + OBD_FREE(slave_locks, slave_locks_size); + RETURN(rc); } + slave_locks->ha_handles[i] = lockh; } + einfo->ei_cbdata = slave_locks; - /* - * if the parent doesn't provide with specific pattern, grab fs-wide one - */ - desc = &d->lod_desc; - if (lc->ldo_stripenr == 0) - lc->ldo_stripenr = desc->ld_default_stripe_count; - if (lc->ldo_stripe_size == 0) - lc->ldo_stripe_size = desc->ld_default_stripe_size; - CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n", - lc->ldo_stripenr, lc->ldo_stripe_size, - lc->ldo_pool ? lc->ldo_pool : ""); - -out: - /* we do not cache stripe information for slave stripe, see - * lod_xattr_set_lov_on_dir */ - if (lp != NULL && lp->ldo_dir_slave_stripe) - lod_lov_stripe_cache_clear(lp); - - EXIT; + RETURN(0); } -#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) /** - * Size initialization on late striping. - * - * Propagate the size of a truncated object to a deferred striping. - * This function handles a special case when truncate was done on a - * non-striped object and now while the striping is being created - * we can't lose that size, so we have to propagate it to the stripes - * being created. - * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] th transaction handle + * Implementation of dt_object_operations::do_invalidate. * - * \retval 0 on success - * \retval negative if failed + * \see dt_object_operations::do_invalidate() in the API description for details */ -static int lod_declare_init_size(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) +static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + return dt_invalidate(env, dt_object_child(dt)); +} + +static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; - uint64_t size, offs; - int rc, stripe; ENTRY; - /* XXX: we support the simplest (RAID0) striping so far */ - LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); - LASSERT(lo->ldo_stripe_size > 0); + /* clear memory region that will be used for layout change */ + memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr)); + info->lti_count = 0; - if (lo->ldo_stripenr == 0) + if (info->lti_comp_size >= comp_cnt) RETURN(0); - rc = dt_attr_get(env, next, attr); - LASSERT(attr->la_valid & LA_SIZE); - if (rc) - RETURN(rc); + if (info->lti_comp_size > 0) { + OBD_FREE(info->lti_comp_idx, + info->lti_comp_size * sizeof(__u32)); + info->lti_comp_size = 0; + } - size = attr->la_size; - if (size == 0) - RETURN(0); + OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); + if (!info->lti_comp_idx) + RETURN(-ENOMEM); + + info->lti_comp_size = comp_cnt; + RETURN(0); +} - /* ll_do_div64(a, b) returns a % b, and a = a / b */ - ll_do_div64(size, (__u64) lo->ldo_stripe_size); - stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr); +static int lod_declare_instantiate_components(const struct lu_env *env, + struct lod_object *lo, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + int i; + int rc = 0; + ENTRY; - size = size * lo->ldo_stripe_size; - offs = attr->la_size; - size += ll_do_div64(offs, lo->ldo_stripe_size); + LASSERT(info->lti_count < lo->ldo_comp_cnt); - attr->la_valid = LA_SIZE; - attr->la_size = size; + for (i = 0; i < info->lti_count; i++) { + rc = lod_qos_prep_create(env, lo, NULL, th, + info->lti_comp_idx[i]); + if (rc) + break; + } - rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr, - th); + if (!rc) { + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); + } RETURN(rc); } -/** - * Declare creation of striped object. - * - * The function declares creation stripes for a regular object. The function - * also declares whether the stripes will be created with non-zero size if - * previously size was set non-zero on the master object. If object \a dt is - * not local, then only fully defined striping can be applied in \a lovea. - * Otherwise \a lovea can be in the form of pattern, see lod_qos_parse_config() - * for the details. - * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] attr attributes the stripes will be created with - * \param[in] lovea a buffer containing striping description - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed - */ -int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - const struct lu_buf *lovea, struct thandle *th) +static int lod_declare_update_plain(const struct lu_env *env, + struct lod_object *lo, struct layout_intent *layout, + const struct lu_buf *buf, struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lod_layout_component *lod_comp; + struct lov_comp_md_v1 *comp_v1 = NULL; + bool replay = false; + int i, rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) { - /* failed to create striping, let's reset - * config so that others don't get confused */ - lod_object_free_striping(env, lo); - GOTO(out, rc = -ENOMEM); - } + LASSERT(lo->ldo_flr_state == LCM_FL_NONE); - if (!dt_object_remote(next)) { - /* choose OST and generate appropriate objects */ - rc = lod_qos_prep_create(env, lo, attr, lovea, th); - if (rc) { - /* failed to create striping, let's reset - * config so that others don't get confused */ - lod_object_free_striping(env, lo); - GOTO(out, rc); + /* + * In case the client is passing lovea, which only happens during + * the replay of layout intent write RPC for now, we may need to + * parse the lovea and apply new layout configuration. + */ + if (buf && buf->lb_len) { + struct lov_user_md_v1 *v1 = buf->lb_buf; + + if (v1->lmm_magic != (LOV_MAGIC_DEFINED | LOV_MAGIC_COMP_V1) && + v1->lmm_magic != __swab32(LOV_MAGIC_DEFINED | + LOV_MAGIC_COMP_V1)) { + CERROR("%s: the replay buffer of layout extend " + "(magic %#x) does not contain expected " + "composite layout.\n", + lod2obd(d)->obd_name, v1->lmm_magic); + GOTO(out, rc = -EINVAL); } - /* - * declare storage for striping data - */ - info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr, - lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1); + rc = lod_use_defined_striping(env, lo, buf); + if (rc) + GOTO(out, rc); + + rc = lod_get_lov_ea(env, lo); + if (rc <= 0) + GOTO(out, rc); + /* old on-disk EA is stored in info->lti_buf */ + comp_v1 = (struct lov_comp_md_v1 *)info->lti_buf.lb_buf; + replay = true; } else { - /* LOD can not choose OST objects for remote objects, i.e. - * stripes must be ready before that. Right now, it can only - * happen during migrate, i.e. migrate process needs to create - * remote regular file (mdd_migrate_create), then the migrate - * process will provide stripeEA. */ - LASSERT(lovea != NULL); - info->lti_buf = *lovea; + /* non replay path */ + rc = lod_striping_load(env, lo); + if (rc) + GOTO(out, rc); } - rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); - if (rc) - GOTO(out, rc); + /* Make sure defined layout covers the requested write range. */ + lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]; + if (lo->ldo_comp_cnt > 1 && + lod_comp->llc_extent.e_end != OBD_OBJECT_EOF && + lod_comp->llc_extent.e_end < layout->li_extent.e_end) { + CDEBUG(replay ? D_ERROR : D_LAYOUT, + "%s: the defined layout [0, %#llx) does not covers " + "the write range "DEXT"\n", + lod2obd(d)->obd_name, lod_comp->llc_extent.e_end, + PEXT(&layout->li_extent)); + GOTO(out, rc = -EINVAL); + } + + CDEBUG(D_LAYOUT, "%s: "DFID": instantiate components "DEXT"\n", + lod2obd(d)->obd_name, PFID(lod_object_fid(lo)), + PEXT(&layout->li_extent)); /* - * if striping is created with local object's size > 0, - * we have to propagate this size to specific object - * the case is possible only when local object was created previously + * Iterate ld->ldo_comp_entries, find the component whose extent under + * the write range and not instantianted. */ - if (dt_object_exists(next)) - rc = lod_declare_init_size(env, dt, th); + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_extent.e_start >= layout->li_extent.e_end) + break; + + if (!replay) { + if (lod_comp_inited(lod_comp)) + continue; + } else { + /** + * In replay path, lod_comp is the EA passed by + * client replay buffer, comp_v1 is the pre-recovery + * on-disk EA, we'd sift out those components which + * were init-ed in the on-disk EA. + */ + if (le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags) & + LCME_FL_INIT) + continue; + } + /* + * this component hasn't instantiated in normal path, or during + * replay it needs replay the instantiation. + */ + + /* A released component is being extended */ + if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) + GOTO(out, rc = -EINVAL); + + LASSERT(info->lti_comp_idx != NULL); + info->lti_comp_idx[info->lti_count++] = i; + } + + if (info->lti_count == 0) + RETURN(-EALREADY); + + lod_obj_inc_layout_gen(lo); + rc = lod_declare_instantiate_components(env, lo, th); out: + if (rc) + lod_striping_free(env, lo); RETURN(rc); } -/** - * Implementation of dt_object_operations::do_declare_create. - * - * The method declares creation of a new object. If the object will be striped, - * then helper functions are called to find FIDs for the stripes, declare - * creation of the stripes and declare initialization of the striping - * information to be stored in the master object. - * - * \see dt_object_operations::do_declare_create() in the API description - * for details. - */ -static int lod_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +static inline int lod_comp_index(struct lod_object *lo, + struct lod_layout_component *lod_comp) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - ENTRY; - - LASSERT(dof); - LASSERT(attr); - LASSERT(th); + LASSERT(lod_comp >= lo->ldo_comp_entries && + lod_comp <= &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]); - /* - * first of all, we declare creation of local object - */ - rc = lod_sub_object_declare_create(env, next, attr, hint, dof, th); - if (rc != 0) - GOTO(out, rc); + return lod_comp - lo->ldo_comp_entries; +} - if (dof->dof_type == DFT_SYM) - dt->do_body_ops = &lod_body_lnk_ops; - else if (dof->dof_type == DFT_REGULAR) - dt->do_body_ops = &lod_body_ops; +/** + * Stale other mirrors by writing extent. + */ +static void lod_stale_components(struct lod_object *lo, int primary, + struct lu_extent *extent) +{ + struct lod_layout_component *pri_comp, *lod_comp; + int i; - /* - * it's lod_ah_init() that has decided the object will be striped - */ - if (dof->dof_type == DFT_REGULAR) { - /* callers don't want stripes */ - /* XXX: all tricky interactions with ->ah_make_hint() decided - * to use striping, then ->declare_create() behaving differently - * should be cleaned */ - if (dof->u.dof_reg.striped == 0) - lo->ldo_stripenr = 0; - if (lo->ldo_stripenr > 0) - rc = lod_declare_striped_object(env, dt, attr, - NULL, th); - } else if (dof->dof_type == DFT_DIR) { - struct seq_server_site *ss; + /* The writing extent decides which components in the primary + * are affected... */ + CDEBUG(D_LAYOUT, "primary mirror %d, "DEXT"\n", primary, PEXT(extent)); + lod_foreach_mirror_comp(pri_comp, lo, primary) { + if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent)) + continue; - ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); + CDEBUG(D_LAYOUT, "primary comp %u "DEXT"\n", + lod_comp_index(lo, pri_comp), + PEXT(&pri_comp->llc_extent)); - /* If the parent has default stripeEA, and client - * did not find it before sending create request, - * then MDT will return -EREMOTE, and client will - * retrieve the default stripeEA and re-create the - * sub directory. - * - * Note: if dah_eadata != NULL, it means creating the - * striped directory with specified stripeEA, then it - * should ignore the default stripeEA */ - if (hint != NULL && hint->dah_eadata == NULL) { - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) - GOTO(out, rc = -EREMOTE); + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (i == primary) + continue; - if (lo->ldo_dir_stripe_offset == -1) { - /* child and parent should be in the same MDT */ - if (hint->dah_parent != NULL && - dt_object_remote(hint->dah_parent)) - GOTO(out, rc = -EREMOTE); - } else if (lo->ldo_dir_stripe_offset != - ss->ss_node_id) { - struct lod_device *lod; - struct lod_tgt_descs *ltd; - struct lod_tgt_desc *tgt = NULL; - bool found_mdt = false; - int i; + /* ... and then stale other components that are + * overlapping with primary components */ + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lu_extent_is_overlapped( + &pri_comp->llc_extent, + &lod_comp->llc_extent)) + continue; - lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - ltd = &lod->lod_mdt_descs; - cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { - tgt = LTD_TGT(ltd, i); - if (tgt->ltd_index == - lo->ldo_dir_stripe_offset) { - found_mdt = true; - break; - } - } + CDEBUG(D_LAYOUT, "stale: %u / %u\n", + i, lod_comp_index(lo, lod_comp)); - /* If the MDT indicated by stripe_offset can be - * found, then tell client to resend the create - * request to the correct MDT, otherwise return - * error to client */ - if (found_mdt) - GOTO(out, rc = -EREMOTE); - else - GOTO(out, rc = -EINVAL); + lod_comp->llc_flags |= LCME_FL_STALE; + lo->ldo_mirrors[i].lme_stale = 1; } } - - /* Orphan object (like migrating object) does not have - * lod_dir_stripe, see lod_ah_init */ - if (lo->ldo_dir_stripe != NULL) - rc = lod_declare_dir_striping_create(env, dt, attr, - dof, th); } -out: - RETURN(rc); } /** - * Creation of a striped regular object. - * - * The function is called to create the stripe objects for a regular - * striped file. This can happen at the initial object creation or - * when the caller asks LOD to do so using ->do_xattr_set() method - * (so called late striping). Notice all the information are already - * prepared in the form of the list of objects (ldo_stripe field). - * This is done during declare phase. - * + * check an OST's availability * \param[in] env execution environment - * \param[in] dt object - * \param[in] attr attributes the stripes will be created with - * \param[in] dof format of stripes (see OSD API description) - * \param[in] th transaction handle + * \param[in] lo lod object + * \param[in] dt dt object + * \param[in] index mirror index * - * \retval 0 on success - * \retval negative if failed + * \retval negative if failed + * \retval 1 if \a dt is available + * \retval 0 if \a dt is not available */ -int lod_striping_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, struct dt_object_format *dof, - struct thandle *th) +static inline int lod_check_ost_avail(const struct lu_env *env, + struct lod_object *lo, + struct dt_object *dt, int index) { - struct lod_object *lo = lod_dt_obj(dt); - int rc = 0, i; - ENTRY; - - LASSERT(lo->ldo_striping_cached == 0); + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lod_tgt_desc *ost; + __u32 idx; + int type = LU_SEQ_RANGE_OST; + int rc; - /* create all underlying objects */ - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL, - dof, th); - if (rc) - break; + rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &idx, &type); + if (rc < 0) { + CERROR("%s: can't locate "DFID":rc = %d\n", + lod2obd(lod)->obd_name, PFID(lu_object_fid(&dt->do_lu)), + rc); + return rc; } - if (rc == 0) { - rc = lod_generate_and_set_lovea(env, lo, th); - if (rc == 0) - lo->ldo_striping_cached = 1; + ost = OST_TGT(lod, idx); + if (ost->ltd_statfs.os_state & + (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO | + OS_STATE_NOPRECREATE) || + ost->ltd_active == 0) { + CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", + PFID(lod_object_fid(lo)), index, idx, rc); + return 0; } - RETURN(rc); + return 1; } /** - * Implementation of dt_object_operations::do_create. - * - * If any of preceeding methods (like ->do_declare_create(), - * ->do_ah_init(), etc) chose to create a striped object, - * then this method will create the master and the stripes. - * - * \see dt_object_operations::do_create() in the API description for details. + * Pick primary mirror for write + * \param[in] env execution environment + * \param[in] lo object + * \param[in] extent write range */ -static int lod_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, struct thandle *th) +static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, + struct lu_extent *extent) { - struct lod_object *lo = lod_dt_obj(dt); - int rc; + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + unsigned int seq = 0; + struct lod_layout_component *lod_comp; + int i, j, rc; + int picked = -1, second_pick = -1, third_pick = -1; ENTRY; - /* create local object */ - rc = lod_sub_object_create(env, dt_object_child(dt), attr, hint, dof, - th); - if (rc != 0) - RETURN(rc); + if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { + get_random_bytes(&seq, sizeof(seq)); + seq %= lo->ldo_mirror_count; + } - if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - lo->ldo_stripe && dof->u.dof_reg.striped != 0) - rc = lod_striping_create(env, dt, attr, dof, th); + /** + * Pick a mirror as the primary, and check the availability of OSTs. + * + * This algo can be revised later after knowing the topology of + * cluster. + */ + lod_qos_statfs_update(env, lod); + for (i = 0; i < lo->ldo_mirror_count; i++) { + bool ost_avail = true; + int index = (i + seq) % lo->ldo_mirror_count; + + if (lo->ldo_mirrors[index].lme_stale) { + CDEBUG(D_LAYOUT, DFID": mirror %d stale\n", + PFID(lod_object_fid(lo)), index); + continue; + } - RETURN(rc); + /* 2nd pick is for the primary mirror containing unavail OST */ + if (lo->ldo_mirrors[index].lme_primary && second_pick < 0) + second_pick = index; + + /* 3rd pick is for non-primary mirror containing unavail OST */ + if (second_pick < 0 && third_pick < 0) + third_pick = index; + + /** + * we found a non-primary 1st pick, we'd like to find a + * potential pirmary mirror. + */ + if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary) + continue; + + /* check the availability of OSTs */ + lod_foreach_mirror_comp(lod_comp, lo, index) { + if (!lod_comp_inited(lod_comp) || !lod_comp->llc_stripe) + continue; + + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + struct dt_object *dt = lod_comp->llc_stripe[j]; + + rc = lod_check_ost_avail(env, lo, dt, index); + if (rc < 0) + RETURN(rc); + + ost_avail = !!rc; + if (!ost_avail) + break; + } /* for all dt object in one component */ + if (!ost_avail) + break; + } /* for all components in a mirror */ + + /** + * the OSTs where allocated objects locates in the components + * of the mirror are available. + */ + if (!ost_avail) + continue; + + /* this mirror has all OSTs available */ + picked = index; + + /** + * primary with all OSTs are available, this is the perfect + * 1st pick. + */ + if (lo->ldo_mirrors[index].lme_primary) + break; + } /* for all mirrors */ + + /* failed to pick a sound mirror, lower our expectation */ + if (picked < 0) + picked = second_pick; + if (picked < 0) + picked = third_pick; + if (picked < 0) + RETURN(-ENODATA); + + RETURN(picked); } /** - * Implementation of dt_object_operations::do_declare_destroy. - * - * If the object is a striped directory, then the function declares reference - * removal from the master object (this is an index) to the stripes and declares - * destroy of all the stripes. In all the cases, it declares an intention to - * destroy the object itself. - * - * \see dt_object_operations::do_declare_destroy() in the API description - * for details. + * figure out the components should be instantiated for resync. */ -static int lod_declare_object_destroy(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo, + struct lu_extent *extent) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - char *stripe_name = info->lti_key; - int rc, i; - ENTRY; + struct lod_layout_component *lod_comp; + unsigned int need_sync = 0; + int i; - /* - * load striping information, notice we don't do this when object - * is being initialized as we don't need this information till - * few specific cases like destroy, chown + CDEBUG(D_LAYOUT, + DFID": instantiate all stale components in "DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(extent)); + + /** + * instantiate all components within this extent, even non-stale + * components. */ - rc = lod_load_striping(env, lo); - if (rc) - RETURN(rc); + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (!lo->ldo_mirrors[i].lme_stale) + continue; - /* declare destroy for all underlying objects */ - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - rc = next->do_ops->do_index_try(env, next, - &dt_directory_features); - if (rc != 0) - RETURN(rc); + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lu_extent_is_overlapped(extent, + &lod_comp->llc_extent)) + break; - for (i = 0; i < lo->ldo_stripenr; i++) { - rc = lod_sub_object_declare_ref_del(env, next, th); - if (rc != 0) - RETURN(rc); + need_sync++; - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), - i); - rc = lod_sub_object_declare_delete(env, next, - (const struct dt_key *)stripe_name, th); - if (rc != 0) - RETURN(rc); + if (lod_comp_inited(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n", + i, lod_comp_index(lo, lod_comp)); + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); } } - /* - * we declare destroy for the local object - */ - rc = lod_sub_object_declare_destroy(env, next, th); - if (rc) - RETURN(rc); + return need_sync ? 0 : -EALREADY; +} - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) - RETURN(0); +static int lod_declare_update_rdonly(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct lu_extent extent = { 0 }; + int rc; + ENTRY; - /* declare destroy all striped objects */ - for (i = 0; i < lo->ldo_stripenr; i++) { - if (lo->ldo_stripe[i] == NULL) - continue; + LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY); + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || + mlc->mlc_opc == MD_LAYOUT_RESYNC); + LASSERT(lo->ldo_mirror_count > 0); + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *layout = mlc->mlc_intent; + int picked; + + extent = layout->li_extent; + CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(&extent)); + + picked = lod_primary_pick(env, lo, &extent); + if (picked < 0) + RETURN(picked); + + CDEBUG(D_LAYOUT, DFID": picked mirror id %u as primary\n", + PFID(lod_object_fid(lo)), + lo->ldo_mirrors[picked].lme_id); + + if (layout->li_opc == LAYOUT_INTENT_TRUNC) { + /** + * trunc transfers [0, size) in the intent extent, we'd + * stale components overlapping [size, eof). + */ + extent.e_start = extent.e_end; + extent.e_end = OBD_OBJECT_EOF; + } - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) - rc = lod_sub_object_declare_ref_del(env, - lo->ldo_stripe[i], th); + /* stale overlapping components from other mirrors */ + lod_stale_components(lo, picked, &extent); - rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i], - th); - if (rc != 0) - break; + /* restore truncate intent extent */ + if (layout->li_opc == LAYOUT_INTENT_TRUNC) + extent.e_end = extent.e_start; + + /* instantiate components for the picked mirror, start from 0 */ + extent.e_start = 0; + + lod_foreach_mirror_comp(lod_comp, lo, picked) { + if (!lu_extent_is_overlapped(&extent, + &lod_comp->llc_extent)) + break; + + if (lod_comp_inited(lod_comp)) + continue; + + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + + lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + } else { /* MD_LAYOUT_RESYNC */ + int i; + + /** + * could contain multiple non-stale mirrors, so we need to + * prep uninited all components assuming any non-stale mirror + * could be picked as the primary mirror. + */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lod_comp_inited(lod_comp)) + break; + + if (extent.e_end < lod_comp->llc_extent.e_end) + extent.e_end = + lod_comp->llc_extent.e_end; + } + } + + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + /* change the file state to SYNC_PENDING */ + lo->ldo_flr_state = LCM_FL_SYNC_PENDING; + } + + /* Reset the layout version once it's becoming too large. + * This way it can make sure that the layout version is + * monotonously increased in this writing era. */ + lod_obj_inc_layout_gen(lo); + if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) { + __u32 layout_version; + + cfs_get_random_bytes(&layout_version, sizeof(layout_version)); + lo->ldo_layout_gen = layout_version & 0xffff; } + rc = lod_declare_instantiate_components(env, lo, th); + if (rc) + GOTO(out, rc); + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; /* set current version */ + if (mlc->mlc_opc == MD_LAYOUT_RESYNC) + layout_attr->la_layout_version = LU_LAYOUT_RESYNC; + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); + +out: + if (rc) + lod_striping_free(env, lo); RETURN(rc); } -/** - * Implementation of dt_object_operations::do_destroy. - * - * If the object is a striped directory, then the function removes references - * from the master object (this is an index) to the stripes and destroys all - * the stripes. In all the cases, the function destroys the object itself. - * - * \see dt_object_operations::do_destroy() in the API description for details. - */ -static int lod_object_destroy(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) +static int lod_declare_update_write_pending(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - char *stripe_name = info->lti_key; - unsigned int i; - int rc; + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct lu_extent extent = { 0 }; + int primary = -1; + int i; + int rc; ENTRY; - /* destroy sub-stripe of master object */ - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - rc = next->do_ops->do_index_try(env, next, - &dt_directory_features); - if (rc != 0) - RETURN(rc); + LASSERT(lo->ldo_flr_state == LCM_FL_WRITE_PENDING); + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || + mlc->mlc_opc == MD_LAYOUT_RESYNC); - for (i = 0; i < lo->ldo_stripenr; i++) { - rc = lod_sub_object_ref_del(env, next, th); - if (rc != 0) - RETURN(rc); + /* look for the primary mirror */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), - i); + LASSERTF(primary < 0, DFID " has multiple primary: %u / %u", + PFID(lod_object_fid(lo)), + lo->ldo_mirrors[i].lme_id, + lo->ldo_mirrors[primary].lme_id); - CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n", - PFID(lu_object_fid(&dt->do_lu)), stripe_name, - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); + primary = i; + } + if (primary < 0) { + CERROR(DFID ": doesn't have a primary mirror\n", + PFID(lod_object_fid(lo))); + GOTO(out, rc = -ENODATA); + } + + CDEBUG(D_LAYOUT, DFID": found primary %u\n", + PFID(lod_object_fid(lo)), lo->ldo_mirrors[primary].lme_id); + + LASSERT(!lo->ldo_mirrors[primary].lme_stale); + + /* for LAYOUT_WRITE opc, it has to do the following operations: + * 1. stale overlapping componets from stale mirrors; + * 2. instantiate components of the primary mirror; + * 3. transfter layout version to all objects of the primary; + * + * for LAYOUT_RESYNC opc, it will do: + * 1. instantiate components of all stale mirrors; + * 2. transfer layout version to all objects to close write era. */ + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + LASSERT(mlc->mlc_intent != NULL); + + extent = mlc->mlc_intent->li_extent; - rc = lod_sub_object_delete(env, next, - (const struct dt_key *)stripe_name, th); - if (rc != 0) - RETURN(rc); + CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(&extent)); + + if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) { + /** + * trunc transfers [0, size) in the intent extent, we'd + * stale components overlapping [size, eof). + */ + extent.e_start = extent.e_end; + extent.e_end = OBD_OBJECT_EOF; } - } + /* 1. stale overlapping components */ + lod_stale_components(lo, primary, &extent); - rc = lod_sub_object_destroy(env, next, th); - if (rc != 0) - RETURN(rc); + /* 2. find out the components need instantiating. + * instantiate [0, mlc->mlc_intent->e_end) */ - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) - RETURN(0); + /* restore truncate intent extent */ + if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) + extent.e_end = extent.e_start; + extent.e_start = 0; - /* destroy all striped objects */ - for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL) && - (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || - i == cfs_fail_val)) { - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - dt_write_lock(env, lo->ldo_stripe[i], - MOR_TGT_CHILD); - rc = lod_sub_object_ref_del(env, - lo->ldo_stripe[i], th); - dt_write_unlock(env, lo->ldo_stripe[i]); - if (rc != 0) - break; - } + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lu_extent_is_overlapped(&extent, + &lod_comp->llc_extent)) + break; - rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th); - if (rc != 0) + if (lod_comp_inited(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "write instantiate %d / %d\n", + primary, lod_comp_index(lo, lod_comp)); + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } else { /* MD_LAYOUT_RESYNC */ + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lod_comp_inited(lod_comp)) break; + + extent.e_end = lod_comp->llc_extent.e_end; } - } - RETURN(rc); -} + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + /* change the file state to SYNC_PENDING */ + lo->ldo_flr_state = LCM_FL_SYNC_PENDING; + } -/** - * Implementation of dt_object_operations::do_declare_ref_add. - * - * \see dt_object_operations::do_declare_ref_add() in the API description - * for details. - */ -static int lod_declare_ref_add(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) -{ - return lod_sub_object_declare_ref_add(env, dt_object_child(dt), th); -} + rc = lod_declare_instantiate_components(env, lo, th); + if (rc) + GOTO(out, rc); -/** - * Implementation of dt_object_operations::do_ref_add. - * - * \see dt_object_operations::do_ref_add() in the API description for details. - */ -static int lod_ref_add(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) -{ - return lod_sub_object_ref_add(env, dt_object_child(dt), th); -} + /* 3. transfer layout version to OST objects. + * transfer new layout version to OST objects so that stale writes + * can be denied. It also ends an era of writing by setting + * LU_LAYOUT_RESYNC. Normal client can never use this bit to + * send write RPC; only resync RPCs could do it. */ + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; /* set current version */ + if (mlc->mlc_opc == MD_LAYOUT_RESYNC) + layout_attr->la_layout_version = LU_LAYOUT_RESYNC; + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); -/** - * Implementation of dt_object_operations::do_declare_ref_del. - * - * \see dt_object_operations::do_declare_ref_del() in the API description - * for details. - */ -static int lod_declare_ref_del(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) -{ - return lod_sub_object_declare_ref_del(env, dt_object_child(dt), th); + lod_obj_inc_layout_gen(lo); +out: + if (rc) + lod_striping_free(env, lo); + RETURN(rc); } -/** - * Implementation of dt_object_operations::do_ref_del - * - * \see dt_object_operations::do_ref_del() in the API description for details. - */ -static int lod_ref_del(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) +static int lod_declare_update_sync_pending(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) { - return lod_sub_object_ref_del(env, dt_object_child(dt), th); -} + struct lod_thread_info *info = lod_env_info(env); + unsigned sync_components = 0; + unsigned resync_components = 0; + int i; + int rc; + ENTRY; -/** - * Implementation of dt_object_operations::do_object_sync. - * - * \see dt_object_operations::do_object_sync() in the API description - * for details. - */ -static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, - __u64 start, __u64 end) -{ - return dt_object_sync(env, dt_object_child(dt), start, end); -} + LASSERT(lo->ldo_flr_state == LCM_FL_SYNC_PENDING); + LASSERT(mlc->mlc_opc == MD_LAYOUT_RESYNC_DONE || + mlc->mlc_opc == MD_LAYOUT_WRITE); -/** - * Release LDLM locks on the stripes of a striped directory. - * - * Iterates over all the locks taken on the stripe objects and - * release them using ->do_object_unlock() method. - * - * \param[in] env execution environment - * \param[in] dt striped object - * \param[in] einfo lock description - * \param[in] policy data describing requested lock - * - * \retval 0 on success - * \retval negative if failed - */ -static int lod_object_unlock_internal(const struct lu_env *env, - struct dt_object *dt, - struct ldlm_enqueue_info *einfo, - union ldlm_policy_data *policy) -{ - struct lustre_handle_array *slave_locks = einfo->ei_cbdata; - int rc = 0; - int i; - ENTRY; + CDEBUG(D_LAYOUT, DFID ": received op %d in sync pending\n", + PFID(lod_object_fid(lo)), mlc->mlc_opc); - if (slave_locks == NULL) - RETURN(0); + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + CDEBUG(D_LAYOUT, DFID": cocurrent write to sync pending\n", + PFID(lod_object_fid(lo))); - for (i = 1; i < slave_locks->count; i++) { - if (lustre_handle_is_used(&slave_locks->handles[i])) - ldlm_lock_decref(&slave_locks->handles[i], - einfo->ei_mode); + lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + return lod_declare_update_write_pending(env, lo, mlc, th); } - RETURN(rc); -} + /* MD_LAYOUT_RESYNC_DONE */ -/** - * Implementation of dt_object_operations::do_object_unlock. - * - * Used to release LDLM lock(s). - * - * \see dt_object_operations::do_object_unlock() in the API description - * for details. - */ -static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, - struct ldlm_enqueue_info *einfo, - union ldlm_policy_data *policy) -{ - struct lod_object *lo = lod_dt_obj(dt); - struct lustre_handle_array *slave_locks = einfo->ei_cbdata; - int slave_locks_size; - int i; - ENTRY; + for (i = 0; i < lo->ldo_comp_cnt; i++) { + struct lod_layout_component *lod_comp; + int j; - if (slave_locks == NULL) - RETURN(0); + lod_comp = &lo->ldo_comp_entries[i]; - LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); - LASSERT(lo->ldo_stripenr > 1); - /* Note: for remote lock for single stripe dir, MDT will cancel - * the lock by lockh directly */ - LASSERT(!dt_object_remote(dt_object_child(dt))); + if (!(lod_comp->llc_flags & LCME_FL_STALE)) { + sync_components++; + continue; + } - /* locks were unlocked in MDT layer */ - for (i = 1; i < slave_locks->count; i++) - LASSERT(!lustre_handle_is_used(&slave_locks->handles[i])); + for (j = 0; j < mlc->mlc_resync_count; j++) { + if (lod_comp->llc_id != mlc->mlc_resync_ids[j]) + continue; - slave_locks_size = sizeof(*slave_locks) + slave_locks->count * - sizeof(slave_locks->handles[0]); - OBD_FREE(slave_locks, slave_locks_size); - einfo->ei_cbdata = NULL; + mlc->mlc_resync_ids[j] = LCME_ID_INVAL; + lod_comp->llc_flags &= ~LCME_FL_STALE; + resync_components++; + break; + } + } - RETURN(0); -} + /* valid check */ + for (i = 0; i < mlc->mlc_resync_count; i++) { + if (mlc->mlc_resync_ids[i] == LCME_ID_INVAL) + continue; -/** - * Implementation of dt_object_operations::do_object_lock. - * - * Used to get LDLM lock on the non-striped and striped objects. - * - * \see dt_object_operations::do_object_lock() in the API description - * for details. - */ -static int lod_object_lock(const struct lu_env *env, - struct dt_object *dt, - struct lustre_handle *lh, - struct ldlm_enqueue_info *einfo, - union ldlm_policy_data *policy) -{ - struct lod_object *lo = lod_dt_obj(dt); - int rc = 0; - int i; - int slave_locks_size; - struct lustre_handle_array *slave_locks = NULL; - ENTRY; + CDEBUG(D_LAYOUT, DFID": lcme id %u (%d / %zd) not exist " + "or already synced\n", PFID(lod_object_fid(lo)), + mlc->mlc_resync_ids[i], i, mlc->mlc_resync_count); + GOTO(out, rc = -EINVAL); + } - /* remote object lock */ - if (!einfo->ei_enq_slave) { - LASSERT(dt_object_remote(dt)); - return dt_object_lock(env, dt_object_child(dt), lh, einfo, - policy); + if (!sync_components || (mlc->mlc_resync_count && !resync_components)) { + CDEBUG(D_LAYOUT, DFID": no mirror in sync\n", + PFID(lod_object_fid(lo))); + + /* tend to return an error code here to prevent + * the MDT from setting SoM attribute */ + GOTO(out, rc = -EINVAL); } - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); + CDEBUG(D_LAYOUT, DFID": resynced %u/%zu components\n", + PFID(lod_object_fid(lo)), + resync_components, mlc->mlc_resync_count); - rc = lod_load_striping(env, lo); - if (rc != 0) - RETURN(rc); + lo->ldo_flr_state = LCM_FL_RDONLY; + lod_obj_inc_layout_gen(lo); - /* No stripes */ - if (lo->ldo_stripenr <= 1) - RETURN(0); + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); + EXIT; - slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr * - sizeof(slave_locks->handles[0]); - /* Freed in lod_object_unlock */ - OBD_ALLOC(slave_locks, slave_locks_size); - if (slave_locks == NULL) - RETURN(-ENOMEM); - slave_locks->count = lo->ldo_stripenr; +out: + if (rc) + lod_striping_free(env, lo); + RETURN(rc); +} - /* striped directory lock */ - for (i = 1; i < lo->ldo_stripenr; i++) { - struct lustre_handle lockh; - struct ldlm_res_id *res_id; +static int lod_declare_layout_change(const struct lu_env *env, + struct dt_object *dt, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; - res_id = &lod_env_info(env)->lti_res_id; - fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu), - res_id); - einfo->ei_res_id = res_id; + if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) || + dt_object_remote(dt_object_child(dt))) + RETURN(-EINVAL); - LASSERT(lo->ldo_stripe[i] != NULL); - if (likely(dt_object_remote(lo->ldo_stripe[i]))) { - rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, - einfo, policy); - } else { - struct ldlm_namespace *ns = einfo->ei_namespace; - ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; - ldlm_completion_callback completion = einfo->ei_cb_cp; - __u64 dlmflags = LDLM_FL_ATOMIC_CB; + rc = lod_striping_load(env, lo); + if (rc) + GOTO(out, rc); - if (einfo->ei_mode == LCK_PW || - einfo->ei_mode == LCK_EX) - dlmflags |= LDLM_FL_COS_INCOMPAT; + LASSERT(lo->ldo_comp_cnt > 0); - /* This only happens if there are mulitple stripes - * on the master MDT, i.e. except stripe0, there are - * other stripes on the Master MDT as well, Only - * happens in the test case right now. */ - LASSERT(ns != NULL); - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, - policy, einfo->ei_mode, - &dlmflags, blocking, - completion, NULL, - NULL, 0, LVB_T_NONE, - NULL, &lockh); - } - if (rc != 0) - GOTO(out, rc); - slave_locks->handles[i] = lockh; + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + GOTO(out, rc); + + switch (lo->ldo_flr_state) { + case LCM_FL_NONE: + rc = lod_declare_update_plain(env, lo, mlc->mlc_intent, + &mlc->mlc_buf, th); + break; + case LCM_FL_RDONLY: + rc = lod_declare_update_rdonly(env, lo, mlc, th); + break; + case LCM_FL_WRITE_PENDING: + rc = lod_declare_update_write_pending(env, lo, mlc, th); + break; + case LCM_FL_SYNC_PENDING: + rc = lod_declare_update_sync_pending(env, lo, mlc, th); + break; + default: + rc = -ENOTSUPP; + break; } +out: + RETURN(rc); +} - einfo->ei_cbdata = slave_locks; +/** + * Instantiate layout component objects which covers the intent write offset. + */ +static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, + struct md_layout_change *mlc, struct thandle *th) +{ + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr; + struct lod_object *lo = lod_dt_obj(dt); + int rc; -out: - if (rc != 0 && slave_locks != NULL) { - einfo->ei_cbdata = slave_locks; - lod_object_unlock_internal(env, dt, einfo, policy); - OBD_FREE(slave_locks, slave_locks_size); - einfo->ei_cbdata = NULL; + rc = lod_striped_create(env, dt, attr, NULL, th); + if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) { + layout_attr->la_layout_version |= lo->ldo_layout_gen; + rc = lod_attr_set(env, dt, layout_attr, th); } - RETURN(rc); + return rc; } struct dt_object_operations lod_obj_ops = { - .do_read_lock = lod_object_read_lock, - .do_write_lock = lod_object_write_lock, - .do_read_unlock = lod_object_read_unlock, - .do_write_unlock = lod_object_write_unlock, - .do_write_locked = lod_object_write_locked, + .do_read_lock = lod_read_lock, + .do_write_lock = lod_write_lock, + .do_read_unlock = lod_read_unlock, + .do_write_unlock = lod_write_unlock, + .do_write_locked = lod_write_locked, .do_attr_get = lod_attr_get, .do_declare_attr_set = lod_declare_attr_set, .do_attr_set = lod_attr_set, @@ -3996,10 +6218,10 @@ struct dt_object_operations lod_obj_ops = { .do_xattr_del = lod_xattr_del, .do_xattr_list = lod_xattr_list, .do_ah_init = lod_ah_init, - .do_declare_create = lod_declare_object_create, - .do_create = lod_object_create, - .do_declare_destroy = lod_declare_object_destroy, - .do_destroy = lod_object_destroy, + .do_declare_create = lod_declare_create, + .do_create = lod_create, + .do_declare_destroy = lod_declare_destroy, + .do_destroy = lod_destroy, .do_index_try = lod_index_try, .do_declare_ref_add = lod_declare_ref_add, .do_ref_add = lod_ref_add, @@ -4008,6 +6230,9 @@ struct dt_object_operations lod_obj_ops = { .do_object_sync = lod_object_sync, .do_object_lock = lod_object_lock, .do_object_unlock = lod_object_unlock, + .do_invalidate = lod_invalidate, + .do_declare_layout_change = lod_declare_layout_change, + .do_layout_change = lod_layout_change, }; /** @@ -4019,6 +6244,9 @@ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos) { struct dt_object *next = dt_object_child(dt); + + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || + S_ISLNK(dt->do_lu.lo_header->loh_attr)); return next->do_body_ops->dbo_read(env, next, buf, pos); } @@ -4033,8 +6261,7 @@ static ssize_t lod_declare_write(const struct lu_env *env, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - return lod_sub_object_declare_write(env, dt_object_child(dt), buf, pos, - th); + return lod_sub_declare_write(env, dt_object_child(dt), buf, pos, th); } /** @@ -4046,7 +6273,9 @@ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *th, int iq) { - return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq); + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || + S_ISLNK(dt->do_lu.lo_header->loh_attr)); + return lod_sub_write(env, dt_object_child(dt), buf, pos, th, iq); } static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt, @@ -4055,8 +6284,7 @@ static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt, if (dt_object_remote(dt)) return -ENOTSUPP; - return lod_sub_object_declare_punch(env, dt_object_child(dt), start, - end, th); + return lod_sub_declare_punch(env, dt_object_child(dt), start, end, th); } static int lod_punch(const struct lu_env *env, struct dt_object *dt, @@ -4065,16 +6293,17 @@ static int lod_punch(const struct lu_env *env, struct dt_object *dt, if (dt_object_remote(dt)) return -ENOTSUPP; - return lod_sub_object_punch(env, dt_object_child(dt), start, end, th); + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); + return lod_sub_punch(env, dt_object_child(dt), start, end, th); } -static const struct dt_body_operations lod_body_lnk_ops = { - .dbo_read = lod_read, - .dbo_declare_write = lod_declare_write, - .dbo_write = lod_write -}; - -static const struct dt_body_operations lod_body_ops = { +/* + * different type of files use the same body_ops because object may be created + * in OUT, where there is no chance to set correct body_ops for each type, so + * body_ops themselves will check file type inside, see lod_read/write/punch for + * details. + */ +const struct dt_body_operations lod_body_ops = { .dbo_read = lod_read, .dbo_declare_write = lod_declare_write, .dbo_write = lod_write, @@ -4151,6 +6380,8 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, if (unlikely(cobj == NULL)) RETURN(-ENOMEM); + lu2lod_obj(lo)->ldo_obj.do_body_ops = &lod_body_ops; + lu_object_add(lo, cobj); RETURN(0); @@ -4166,52 +6397,59 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, * \param[in] env execution environment * \param[in] lo object */ -void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) +void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) { - int i; + struct lod_layout_component *lod_comp; + int i, j; - if (lo->ldo_dir_stripe != NULL) { - OBD_FREE_PTR(lo->ldo_dir_stripe); - lo->ldo_dir_stripe = NULL; - } - - if (lo->ldo_stripe) { - LASSERT(lo->ldo_stripes_allocated > 0); + if (lo->ldo_stripe != NULL) { + LASSERT(lo->ldo_comp_entries == NULL); + LASSERT(lo->ldo_dir_stripes_allocated > 0); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { if (lo->ldo_stripe[i]) - lu_object_put(env, &lo->ldo_stripe[i]->do_lu); + dt_object_put(env, lo->ldo_stripe[i]); } - i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated; - OBD_FREE(lo->ldo_stripe, i); + j = sizeof(struct dt_object *) * lo->ldo_dir_stripes_allocated; + OBD_FREE(lo->ldo_stripe, j); lo->ldo_stripe = NULL; - lo->ldo_stripes_allocated = 0; + lo->ldo_dir_stripes_allocated = 0; + lo->ldo_dir_stripe_loaded = 0; + lo->ldo_dir_stripe_count = 0; + } else if (lo->ldo_comp_entries != NULL) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + /* free lod_layout_component::llc_stripe array */ + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_stripe == NULL) + continue; + LASSERT(lod_comp->llc_stripes_allocated != 0); + for (j = 0; j < lod_comp->llc_stripes_allocated; j++) { + if (lod_comp->llc_stripe[j] != NULL) + lu_object_put(env, + &lod_comp->llc_stripe[j]->do_lu); + } + OBD_FREE(lod_comp->llc_stripe, + sizeof(struct dt_object *) * + lod_comp->llc_stripes_allocated); + lod_comp->llc_stripe = NULL; + OBD_FREE(lod_comp->llc_ost_indices, + sizeof(__u32) * + lod_comp->llc_stripes_allocated); + lod_comp->llc_ost_indices = NULL; + lod_comp->llc_stripes_allocated = 0; + } + lod_free_comp_entries(lo); + lo->ldo_comp_cached = 0; } - lo->ldo_striping_cached = 0; - lo->ldo_stripenr = 0; - lo->ldo_pattern = 0; } -/** - * Implementation of lu_object_operations::loo_object_start. - * - * \see lu_object_operations::loo_object_start() in the API description - * for details. - */ -static int lod_object_start(const struct lu_env *env, struct lu_object *o) +void lod_striping_free(const struct lu_env *env, struct lod_object *lo) { - if (S_ISLNK(o->lo_header->loh_attr & S_IFMT)) { - lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops; - } else if (S_ISREG(o->lo_header->loh_attr & S_IFMT) || - fid_is_local_file(lu_object_fid(o))) { - /* Note: some local file (like last rcvd) is created - * through bottom layer (OSD), so the object initialization - * comes to lod, it does not set loh_attr yet, so - * set do_body_ops for local file anyway */ - lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_ops; - } - return 0; + mutex_lock(&lo->ldo_layout_mutex); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); } /** @@ -4222,18 +6460,12 @@ static int lod_object_start(const struct lu_env *env, struct lu_object *o) */ static void lod_object_free(const struct lu_env *env, struct lu_object *o) { - struct lod_object *mo = lu2lod_obj(o); - - /* - * release all underlying object pinned - */ - - lod_object_free_striping(env, mo); - - lod_object_set_pool(mo, NULL); + struct lod_object *lo = lu2lod_obj(o); + /* release all underlying object pinned */ + lod_striping_free(env, lo); lu_object_fini(o); - OBD_SLAB_FREE_PTR(mo, lod_object_kmem); + OBD_SLAB_FREE_PTR(lo, lod_object_kmem); } /** @@ -4264,7 +6496,6 @@ static int lod_object_print(const struct lu_env *env, void *cookie, struct lu_object_operations lod_lu_obj_ops = { .loo_object_init = lod_object_init, - .loo_object_start = lod_object_start, .loo_object_free = lod_object_free, .loo_object_release = lod_object_release, .loo_object_print = lod_object_print,