X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=91b901b65199bb9fbf534a8b7db8d6365b81729f;hb=083d62ee6de5ac6cee95c1d2f86b62b75034093b;hp=24516ed9848d0b862f10914d66778fcf55858ed9;hpb=25faf70d7b46600ce35bed8e6e834318597ff9d3;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 24516ed..91b901b 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -40,6 +40,8 @@ #define DEBUG_SUBSYSTEM S_MDS +#include + #include #include #include @@ -947,7 +949,7 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(next->do_ops); LASSERT(next->do_ops->do_index_try); - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -1046,6 +1048,18 @@ static int lod_attr_get(const struct lu_env *env, return dt_attr_get(env, dt_object_child(dt), attr); } +static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, + struct lov_desc *desc) +{ + if (comp->llc_pattern != LOV_PATTERN_MDT) { + if (!comp->llc_stripe_count) + comp->llc_stripe_count = + desc->ld_default_stripe_count; + } + if (comp->llc_stripe_size <= 0) + comp->llc_stripe_size = desc->ld_default_stripe_size; +} + int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, struct thandle *th, struct lod_obj_stripe_cb_data *data) @@ -1071,6 +1085,18 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, data->locd_comp_skip_cb(env, lo, i, data)) continue; + if (data->locd_comp_cb) { + rc = data->locd_comp_cb(env, lo, i, data); + if (rc) + RETURN(rc); + } + + /* could used just to do sth about component, not each + * stripes + */ + if (!data->locd_stripe_cb) + continue; + LASSERT(lod_comp->llc_stripe_count > 0); for (j = 0; j < lod_comp->llc_stripe_count; j++) { struct dt_object *dt = lod_comp->llc_stripe[j]; @@ -1205,7 +1231,7 @@ static int lod_declare_attr_set(const struct lu_env *env, * is being initialized as we don't need this information till * few specific cases like destroy, chown */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc) RETURN(rc); @@ -1302,7 +1328,7 @@ static int lod_attr_set(const struct lu_env *env, * the in-memory striping information has been freed in lod_xattr_set() * due to layout change. It has to load stripe here again. It only * changes flags of layout so declare_attr_set() is still accurate */ - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc) RETURN(rc); @@ -1633,6 +1659,8 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, int rc = 0; ENTRY; + LASSERT(mutex_is_locked(&lo->ldo_layout_mutex)); + if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) RETURN(0); @@ -1691,7 +1719,7 @@ out: lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); if (rc != 0) - lod_object_free_striping(env, lo); + lod_striping_free_nolock(env, lo); RETURN(rc); } @@ -1859,10 +1887,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, int rc = 0; __u32 i; __u32 j; + bool is_specific = false; ENTRY; /* The lum has been verifed in lod_verify_md_striping */ - LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0); stripe_count = le32_to_cpu(lum->lum_stripe_count); @@ -1878,6 +1908,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, /* Start index must be the master MDT */ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; idx_array[0] = master_index; + if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + is_specific = true; + for (i = 1; i < stripe_count; i++) + idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds); + } + for (i = 0; i < stripe_count; i++) { struct lod_tgt_desc *tgt = NULL; struct dt_object *dto; @@ -1896,7 +1932,8 @@ static int lod_prep_md_striped_create(const struct lu_env *env, CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", idx, lod->lod_remote_mdt_count + 1, i); - if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + if (likely(!is_specific && + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { /* check whether the idx already exists * in current allocated array */ for (k = 0; k < i; k++) { @@ -1959,9 +1996,17 @@ static int lod_prep_md_striped_create(const struct lu_env *env, idx, i, PFID(&fid)); idx_array[i] = idx; /* Set the start index for next stripe allocation */ - if (i < stripe_count - 1) + if (!is_specific && i < stripe_count - 1) { + /* + * for large dir test, put all other slaves on one + * remote MDT, otherwise we may save too many local + * slave locks which will exceed RS_MAX_LOCKS. + */ + if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) + idx = master_index; idx_array[i + 1] = (idx + 1) % (lod->lod_remote_mdt_count + 1); + } /* tgt_dt and fid must be ready after search avaible OSP * in the above loop */ LASSERT(tgt_dt != NULL); @@ -1975,11 +2020,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, stripe[i] = dto; } - lo->ldo_dir_stripe_loaded = 1; lo->ldo_dir_striped = 1; lo->ldo_stripe = stripe; lo->ldo_dir_stripe_count = i; lo->ldo_dir_stripes_allocated = stripe_count; + smp_mb(); + lo->ldo_dir_stripe_loaded = 1; if (lo->ldo_dir_stripe_count == 0) GOTO(out_put, rc = -ENOSPC); @@ -2032,31 +2078,25 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct thandle *th) { struct lod_object *lo = lod_dt_obj(dt); - struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); - struct lmv_user_md_v1 *lum; + struct lmv_user_md_v1 *lum = lum_buf->lb_buf; int rc; ENTRY; - lum = lum_buf->lb_buf; LASSERT(lum != NULL); CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n", le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), (int)le32_to_cpu(lum->lum_stripe_offset)); - if (le32_to_cpu(lum->lum_stripe_count) == 0) + if (lo->ldo_dir_stripe_count == 0) GOTO(out, rc = 0); - rc = lod_verify_md_striping(lod, lum); - if (rc != 0) - GOTO(out, rc); - /* prepare dir striped objects */ rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); if (rc != 0) { /* failed to create striping, let's reset * config so that others don't get confused */ - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); GOTO(out, rc); } out: @@ -2112,7 +2152,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(0); /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -2139,6 +2179,7 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, struct lod_obj_stripe_cb_data *data) { struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx]; struct filter_fid *ff = &info->lti_ff; struct lu_buf *buf = &info->lti_buf; int rc; @@ -2146,15 +2187,28 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, buf->lb_buf = ff; buf->lb_len = sizeof(*ff); rc = dt_xattr_get(env, dt, buf, XATTR_NAME_FID); - if (rc == -ENODATA) - return 0; - - if (rc < 0) + if (rc < 0) { + if (rc == -ENODATA) + return 0; return rc; + } + + filter_fid_le_to_cpu(ff, ff, sizeof(*ff)); + if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) && + ff->ff_layout.ol_comp_id == comp->llc_id) + return 0; + /* rewrite filter_fid */ + memset(ff, 0, sizeof(*ff)); ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); ff->ff_parent.f_ver = stripe_idx; - fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent); + ff->ff_layout.ol_stripe_size = comp->llc_stripe_size; + ff->ff_layout.ol_stripe_count = comp->llc_stripe_count; + ff->ff_layout.ol_comp_id = comp->llc_id; + ff->ff_layout.ol_comp_start = comp->llc_extent.e_start; + ff->ff_layout.ol_comp_end = comp->llc_extent.e_end; + filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); + if (data->locd_declare) rc = lod_sub_declare_xattr_set(env, dt, buf, XATTR_NAME_FID, LU_XATTR_REPLACE, th); @@ -2195,7 +2249,7 @@ static int lod_replace_parent_fid(const struct lu_env *env, LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -2308,7 +2362,7 @@ static int lod_declare_layout_add(const struct lu_env *env, LASSERT(lo->ldo_is_composite); - if (lo->ldo_flr_state != LCM_FL_NOT_FLR) + if (lo->ldo_flr_state != LCM_FL_NONE) RETURN(-EBUSY); rc = lod_verify_striping(d, lo, buf, false); @@ -2347,14 +2401,8 @@ static int lod_declare_layout_add(const struct lu_env *env, lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags; lod_comp->llc_stripe_count = v1->lmm_stripe_count; - if (!lod_comp->llc_stripe_count || - lod_comp->llc_stripe_count == (__u16)-1) - lod_comp->llc_stripe_count = - desc->ld_default_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; - if (!lod_comp->llc_stripe_size) - lod_comp->llc_stripe_size = - desc->ld_default_stripe_size; + lod_adjust_stripe_info(lod_comp, desc); if (v1->lmm_magic == LOV_USER_MAGIC_V3) { v3 = (struct lov_user_md_v3 *) v1; @@ -2430,7 +2478,7 @@ static int lod_declare_layout_set(const struct lu_env *env, struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct lod_object *lo = lod_dt_obj(dt); struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; - __u32 magic, id; + __u32 magic; int i, j, rc; bool changed = false; ENTRY; @@ -2457,15 +2505,27 @@ static int lod_declare_layout_set(const struct lu_env *env, } for (i = 0; i < comp_v1->lcm_entry_count; i++) { - id = comp_v1->lcm_entries[i].lcme_id; + __u32 id = comp_v1->lcm_entries[i].lcme_id; + __u32 flags = comp_v1->lcm_entries[i].lcme_flags; + + if (flags & LCME_FL_INIT) { + if (changed) + lod_striping_free(env, lo); + RETURN(-EINVAL); + } for (j = 0; j < lo->ldo_comp_cnt; j++) { lod_comp = &lo->ldo_comp_entries[j]; - if (id == lod_comp->llc_id || id == LCME_ID_ALL) { - lod_comp->llc_flags = - comp_v1->lcm_entries[i].lcme_flags; - changed = true; + if (id != lod_comp->llc_id) + continue; + + if (flags & LCME_FL_NEG) { + flags &= ~LCME_FL_NEG; + lod_comp->llc_flags &= ~flags; + } else { + lod_comp->llc_flags |= flags; } + changed = true; } } @@ -2478,8 +2538,8 @@ static int lod_declare_layout_set(const struct lu_env *env, lod_obj_inc_layout_gen(lo); info->lti_buf.lb_len = lod_comp_md_size(lo, false); - rc = lod_sub_declare_xattr_set(env, dt, &info->lti_buf, - XATTR_NAME_LOV, 0, th); + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), &info->lti_buf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); RETURN(rc); } @@ -2512,7 +2572,7 @@ static int lod_declare_layout_del(const struct lu_env *env, LASSERT(lo->ldo_is_composite); - if (lo->ldo_flr_state != LCM_FL_NOT_FLR) + if (lo->ldo_flr_state != LCM_FL_NONE) RETURN(-EBUSY); magic = comp_v1->lcm_magic; @@ -2539,6 +2599,12 @@ static int lod_declare_layout_del(const struct lu_env *env, RETURN(-EINVAL); } + if (id == LCME_ID_INVAL && !flags) { + CDEBUG(D_LAYOUT, "%s: no id or flags specified.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + if (flags & LCME_FL_NEG) { neg_flags = flags & ~LCME_FL_NEG; flags = 0; @@ -2634,7 +2700,6 @@ static int lod_declare_modify_layout(const struct lu_env *env, { struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next = dt_object_child(&lo->ldo_obj); char *op; int rc, len = strlen(XATTR_LUSTRE_LOV); ENTRY; @@ -2648,8 +2713,7 @@ static int lod_declare_modify_layout(const struct lu_env *env, } len++; - dt_write_lock(env, next, 0); - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc) GOTO(unlock, rc); @@ -2674,13 +2738,73 @@ static int lod_declare_modify_layout(const struct lu_env *env, } unlock: if (rc) - lod_object_free_striping(env, lo); - dt_write_unlock(env, next); + lod_striping_free(env, lo); RETURN(rc); } /** + * Convert a plain file lov_mds_md to a composite layout. + * + * \param[in,out] info the thread info::lti_ea_store buffer contains little + * endian plain file layout + * + * \retval 0 on success, <0 on failure + */ +static int lod_layout_convert(struct lod_thread_info *info) +{ + struct lov_mds_md *lmm = info->lti_ea_store; + struct lov_mds_md *lmm_save; + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_entry_v1 *lcme; + size_t size; + __u32 blob_size; + int rc = 0; + ENTRY; + + /* realloc buffer to a composite layout which contains one component */ + blob_size = lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count), + le32_to_cpu(lmm->lmm_magic)); + size = sizeof(*lcm) + sizeof(*lcme) + blob_size; + + OBD_ALLOC_LARGE(lmm_save, blob_size); + if (!lmm_save) + GOTO(out, rc = -ENOMEM); + + memcpy(lmm_save, lmm, blob_size); + + if (info->lti_ea_store_size < size) { + rc = lod_ea_store_resize(info, size); + if (rc) + GOTO(out, rc); + } + + lcm = info->lti_ea_store; + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); + lcm->lcm_size = cpu_to_le32(size); + lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu( + lmm_save->lmm_layout_gen)); + lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE); + lcm->lcm_entry_count = cpu_to_le16(1); + lcm->lcm_mirror_count = 0; + + lcme = &lcm->lcm_entries[0]; + lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT); + lcme->lcme_extent.e_start = 0; + lcme->lcme_extent.e_end = cpu_to_le64(OBD_OBJECT_EOF); + lcme->lcme_offset = cpu_to_le32(sizeof(*lcm) + sizeof(*lcme)); + lcme->lcme_size = cpu_to_le32(blob_size); + + memcpy((char *)lcm + lcme->lcme_offset, (char *)lmm_save, blob_size); + + EXIT; +out: + if (lmm_save) + OBD_FREE_LARGE(lmm_save, blob_size); + return rc; +} + +/** * Merge layouts to form a mirrored file. */ static int lod_declare_layout_merge(const struct lu_env *env, @@ -2724,9 +2848,22 @@ static int lod_declare_layout_merge(const struct lu_env *env, RETURN(rc ? : -ENODATA); cur_lcm = info->lti_ea_store; - if (le32_to_cpu(cur_lcm->lcm_magic) != LOV_MAGIC_COMP_V1) - RETURN(-EINVAL); + switch (le32_to_cpu(cur_lcm->lcm_magic)) { + case LOV_MAGIC_V1: + case LOV_MAGIC_V3: + rc = lod_layout_convert(info); + break; + case LOV_MAGIC_COMP_V1: + rc = 0; + break; + default: + rc = -EINVAL; + } + if (rc) + RETURN(rc); + /* info->lti_ea_store could be reallocated in lod_layout_convert() */ + cur_lcm = info->lti_ea_store; cur_entry_count = le16_to_cpu(cur_lcm->lcm_entry_count); /* 'lcm_mirror_count + 1' is the current # of mirrors the file has */ @@ -2796,12 +2933,10 @@ static int lod_declare_layout_merge(const struct lu_env *env, lcm->lcm_size = cpu_to_le32(size); lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count); lcm->lcm_mirror_count = cpu_to_le16(mirror_count); - if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NOT_FLR) + if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE) lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY); - LASSERT(dt_write_locked(env, dt_object_child(dt))); - lod_object_free_striping(env, lo); - rc = lod_parse_striping(env, lo, buf); + rc = lod_striping_reload(env, lo, buf); if (rc) GOTO(out, rc); @@ -2814,6 +2949,30 @@ out: } /** + * Split layouts, just set the LOVEA with the layout from mbuf. + */ +static int lod_declare_layout_split(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *mbuf, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *lcm = mbuf->lb_buf; + int rc; + ENTRY; + + lod_obj_inc_layout_gen(lo); + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + + rc = lod_striping_reload(env, lo, mbuf); + if (rc) + RETURN(rc); + + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + RETURN(rc); +} + +/** * Implementation of dt_object_operations::do_declare_xattr_set. * * \see dt_object_operations::do_declare_xattr_set() in the API description @@ -2837,7 +2996,7 @@ static int lod_declare_xattr_set(const struct lu_env *env, mode = dt->do_lu.lo_header->loh_attr & S_IFMT; if ((S_ISREG(mode) || mode == 0) && - !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE)) && + !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) && (strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0)) { /* @@ -2863,6 +3022,10 @@ static int lod_declare_xattr_set(const struct lu_env *env, LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0); rc = lod_declare_layout_merge(env, dt, buf, th); + } else if (fl & LU_XATTR_SPLIT) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_split(env, dt, buf, th); } else if (S_ISREG(mode) && strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 && strncmp(name, XATTR_LUSTRE_LOV, @@ -3012,6 +3175,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, lum = buf->lb_buf; switch (lum->lmm_magic) { + case LOV_USER_MAGIC_SPECIFIC: case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; if (v3->lmm_pool_name[0] != '\0') @@ -3275,6 +3439,7 @@ out: * \param[in] env execution environment * \param[in] dt object * \param[in] attr attributes the stripes will be created with + * \param[in] lmu lmv_user_md if MDT indices are specified * \param[in] dof format of stripes (see OSD API description) * \param[in] th transaction handle * \param[in] declare where to call "declare" or "execute" methods @@ -3285,6 +3450,7 @@ out: static int lod_dir_striping_create_internal(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, + const struct lu_buf *lmu, struct dt_object_format *dof, struct thandle *th, bool declare) @@ -3301,31 +3467,34 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int stripe_count = lo->ldo_dir_stripe_count; + if (!lmu) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + int stripe_count = lo->ldo_dir_stripe_count; - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; - } + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) + RETURN(rc); + v1 = info->lti_ea_store; + } - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_stripe_offset); + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = cpu_to_le32(stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lo->ldo_dir_stripe_offset); - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + lmu = &info->lti_buf; + } if (declare) - rc = lod_declare_xattr_set_lmv(env, dt, attr, - &info->lti_buf, dof, th); + rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof, + th); else - rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, - XATTR_NAME_LMV, 0, th); + rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0, + th); if (rc != 0) RETURN(rc); } @@ -3403,10 +3572,12 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, static int lod_declare_dir_striping_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, + struct lu_buf *lmu, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, dof, th, true); + return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th, + true); } static int lod_dir_striping_create(const struct lu_env *env, @@ -3415,7 +3586,8 @@ static int lod_dir_striping_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); + return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th, + false); } /** @@ -3447,7 +3619,7 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, LASSERT(lo); if (lo->ldo_comp_cnt == 0) { - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } @@ -3521,6 +3693,9 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, OBD_FREE(lod_comp->llc_stripe, sizeof(struct dt_object *) * lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; + OBD_FREE(lod_comp->llc_ost_indices, + sizeof(__u32) * lod_comp->llc_stripes_allocated); + lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; lod_obj_set_pool(lo, i, NULL); if (lod_comp->llc_ostlist.op_array) { @@ -3571,10 +3746,14 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, EXIT; out: if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); return rc; } + +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds); /** * Implementation of dt_object_operations::do_xattr_set. * @@ -3616,8 +3795,59 @@ static int lod_xattr_set(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { - /* default LOVEA */ - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); + struct lod_thread_info *info = lod_env_info(env); + struct lod_default_striping *lds = &info->lti_def_striping; + struct lov_user_md_v1 *v1 = buf->lb_buf; + char pool[LOV_MAXPOOLNAME + 1]; + bool is_del; + + /* get existing striping config */ + rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds); + if (rc) + RETURN(rc); + + memset(pool, 0, sizeof(pool)); + if (lds->lds_def_striping_set == 1) + lod_layout_get_pool(lds->lds_def_comp_entries, + lds->lds_def_comp_cnt, pool, + sizeof(pool)); + + is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, + v1->lmm_stripe_count, + v1->lmm_stripe_offset, + NULL); + + /* Retain the pool name if it is not given */ + if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && + !is_del) { + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v3 *v3 = info->lti_ea_store; + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); + v3->lmm_stripe_count = + cpu_to_le32(v1->lmm_stripe_count); + v3->lmm_stripe_offset = + cpu_to_le32(v1->lmm_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); + + strlcpy(v3->lmm_pool_name, pool, + sizeof(v3->lmm_pool_name)); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + name, fl, th); + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, + fl, th); + } + + if (lds->lds_def_striping_set == 1 && + lds->lds_def_comp_entries != NULL) + lod_free_def_comp_entries(lds); + RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { @@ -3635,7 +3865,7 @@ static int lod_xattr_set(const struct lu_env *env, * defines striping, then create() does the work */ if (fl & LU_XATTR_REPLACE) { /* free stripes, then update disk */ - lod_object_free_striping(env, lod_dt_obj(dt)); + lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_set(env, next, buf, name, fl, th); } else if (dt_object_remote(dt)) { @@ -3703,7 +3933,7 @@ static int lod_declare_xattr_del(const struct lu_env *env, RETURN(0); /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -3739,7 +3969,7 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, ENTRY; if (!strcmp(name, XATTR_NAME_LOV)) - lod_object_free_striping(env, lod_dt_obj(dt)); + lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_del(env, next, name, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) @@ -3776,6 +4006,49 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); } +/** + * Copy OST list from layout provided by user. + * + * \param[in] lod_comp layout_component to be filled + * \param[in] v3 LOV EA V3 user data + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp, + struct lov_user_md_v3 *v3) +{ + int j; + + ENTRY; + + if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT) + v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx; + + if (lod_comp->llc_ostlist.op_array) { + if (lod_comp->llc_ostlist.op_count == + v3->lmm_stripe_count) + goto skip; + OBD_FREE(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + } + + /* copy ost list from lmm */ + lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count; + lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32); + OBD_ALLOC(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + if (!lod_comp->llc_ostlist.op_array) + RETURN(-ENOMEM); +skip: + for (j = 0; j < v3->lmm_stripe_count; j++) { + lod_comp->llc_ostlist.op_array[j] = + v3->lmm_objects[j].l_ost_idx; + } + + RETURN(0); +} + /** * Get default striping. @@ -3816,13 +4089,19 @@ static int lod_get_default_lov_striping(const struct lu_env *env, } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { v3 = (struct lov_user_md_v3 *)v1; lustre_swab_lov_user_md_v3(v3); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)v1; + lustre_swab_lov_user_md_v3(v3); + lustre_swab_lov_user_md_objects(v3->lmm_objects, + v3->lmm_stripe_count); } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { comp_v1 = (struct lov_comp_md_v1 *)v1; lustre_swab_lov_comp_md_v1(comp_v1); } if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && - v1->lmm_magic != LOV_MAGIC_COMP_V1) + v1->lmm_magic != LOV_MAGIC_COMP_V1 && + v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) RETURN(-ENOTSUPP); if (v1->lmm_magic == LOV_MAGIC_COMP_V1) { @@ -3893,6 +4172,12 @@ static int lod_get_default_lov_striping(const struct lu_env *env, pool = v3->lmm_pool_name; } lod_set_def_pool(lds, i, pool); + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + v3 = (struct lov_user_md_v3 *)v1; + rc = lod_comp_copy_ost_lists(lod_comp, v3); + if (rc) + RETURN(rc); + } } lds->lds_def_striping_set = 1; @@ -3987,6 +4272,8 @@ static void lod_striping_from_default(struct lod_object *lo, return; lo->ldo_is_composite = lds->lds_def_striping_is_composite; + if (lds->lds_def_mirror_cnt > 1) + lo->ldo_flr_state = LCM_FL_RDONLY; for (i = 0; i < lo->ldo_comp_cnt; i++) { struct lod_layout_component *obj_comp = @@ -4009,6 +4296,17 @@ static void lod_striping_from_default(struct lod_object *lo, lod_obj_set_pool(lo, i, def_comp->llc_pool); } + /* copy ost list */ + if (def_comp->llc_ostlist.op_array) { + OBD_ALLOC(obj_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + if (!obj_comp->llc_ostlist.op_array) + return; + memcpy(obj_comp->llc_ostlist.op_array, + def_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + } + /* * Don't initialize these fields for plain layout * (v1/v3) here, they are inherited in the order of @@ -4020,13 +4318,7 @@ static void lod_striping_from_default(struct lod_object *lo, if (!lo->ldo_is_composite) continue; - if (obj_comp->llc_stripe_count <= 0 && - obj_comp->llc_pattern != LOV_PATTERN_MDT) - obj_comp->llc_stripe_count = - desc->ld_default_stripe_count; - if (obj_comp->llc_stripe_size <= 0) - obj_comp->llc_stripe_size = - desc->ld_default_stripe_size; + lod_adjust_stripe_info(obj_comp, desc); } } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { if (lo->ldo_dir_stripe_count == 0) @@ -4117,6 +4409,8 @@ static void lod_ah_init(const struct lu_env *env, nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { + const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; + /* other default values are 0 */ lc->ldo_dir_stripe_offset = -1; @@ -4129,10 +4423,13 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_def_striping = lds; /* It should always honour the specified stripes */ + /* Note: old client (< 2.7)might also do lfs mkdir, whose EA + * will have old magic. In this case, we should ignore the + * stripe count and try to create dir by default stripe. + */ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && - lod_verify_md_striping(d, ah->dah_eadata) == 0) { - const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - + (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) { lc->ldo_dir_stripe_count = le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = @@ -4254,12 +4551,7 @@ out: LASSERT(!lc->ldo_is_composite); lod_comp = &lc->ldo_comp_entries[0]; desc = &d->lod_desc; - if (lod_comp->llc_stripe_count <= 0) - lod_comp->llc_stripe_count = - desc->ld_default_stripe_count; - if (lod_comp->llc_stripe_size <= 0) - lod_comp->llc_stripe_size = - desc->ld_default_stripe_size; + lod_adjust_stripe_info(lod_comp, desc); } EXIT; @@ -4418,7 +4710,7 @@ out: /* failed to create striping or to set initial size, let's reset * config so that others don't get confused */ if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } @@ -4468,6 +4760,8 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, NULL, th); } else if (dof->dof_type == DFT_DIR) { struct seq_server_site *ss; + struct lu_buf buf = { NULL }; + struct lu_buf *lmu = NULL; ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); @@ -4517,15 +4811,20 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, else GOTO(out, rc = -EINVAL); } + } else if (hint && hint->dah_eadata) { + lmu = &buf; + lmu->lb_buf = (void *)hint->dah_eadata; + lmu->lb_len = hint->dah_eadata_len; } - rc = lod_declare_dir_striping_create(env, dt, attr, dof, th); + rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof, + th); } out: /* failed to create striping or to set initial size, let's reset * config so that others don't get confused */ if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } @@ -4658,7 +4957,7 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, RETURN(0); out: - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } @@ -4733,7 +5032,7 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, * is being initialized as we don't need this information till * few specific cases like destroy, chown */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc) RETURN(rc); @@ -4947,42 +5246,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, } /** - * Release LDLM locks on the stripes of a striped directory. - * - * Iterates over all the locks taken on the stripe objects and - * cancel them. - * - * \param[in] env execution environment - * \param[in] dt striped object - * \param[in] einfo lock description - * \param[in] policy data describing requested lock - * - * \retval 0 on success - * \retval negative if failed - */ -static int lod_object_unlock_internal(const struct lu_env *env, - struct dt_object *dt, - struct ldlm_enqueue_info *einfo, - union ldlm_policy_data *policy) -{ - struct lustre_handle_array *slave_locks = einfo->ei_cbdata; - int rc = 0; - int i; - ENTRY; - - if (slave_locks == NULL) - RETURN(0); - - for (i = 1; i < slave_locks->count; i++) { - if (lustre_handle_is_used(&slave_locks->handles[i])) - ldlm_lock_decref_and_cancel(&slave_locks->handles[i], - einfo->ei_mode); - } - - RETURN(rc); -} - -/** * Implementation of dt_object_operations::do_object_unlock. * * Used to release LDLM lock(s). @@ -5010,13 +5273,18 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, LASSERT(!dt_object_remote(dt_object_child(dt))); /* locks were unlocked in MDT layer */ - for (i = 1; i < slave_locks->count; i++) { - LASSERT(!lustre_handle_is_used(&slave_locks->handles[i])); + for (i = 0; i < slave_locks->ha_count; i++) + LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i])); + + /* + * NB, ha_count may not equal to ldo_dir_stripe_count, because dir + * layout may change, e.g., shrink dir layout after migration. + */ + for (i = 0; i < lo->ldo_dir_stripe_count; i++) dt_invalidate(env, lo->ldo_stripe[i]); - } - slave_locks_size = sizeof(*slave_locks) + slave_locks->count * - sizeof(slave_locks->handles[0]); + slave_locks_size = offsetof(typeof(*slave_locks), + ha_handles[slave_locks->ha_count]); OBD_FREE(slave_locks, slave_locks_size); einfo->ei_cbdata = NULL; @@ -5037,11 +5305,11 @@ static int lod_object_lock(const struct lu_env *env, struct ldlm_enqueue_info *einfo, union ldlm_policy_data *policy) { - struct lod_object *lo = lod_dt_obj(dt); - int rc = 0; - int i; - int slave_locks_size; + struct lod_object *lo = lod_dt_obj(dt); + int slave_locks_size; struct lustre_handle_array *slave_locks = NULL; + int i; + int rc; ENTRY; /* remote object lock */ @@ -5052,35 +5320,28 @@ static int lod_object_lock(const struct lu_env *env, } if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - GOTO(out, rc = -ENOTDIR); + RETURN(-ENOTDIR); - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) - GOTO(out, rc); + RETURN(rc); /* No stripes */ - if (lo->ldo_dir_stripe_count <= 1) { - /* - * NB, ei_cbdata stores pointer to slave locks, if no locks - * taken, make sure it's set to NULL, otherwise MDT will try to - * unlock them. - */ - einfo->ei_cbdata = NULL; - GOTO(out, rc = 0); - } + if (lo->ldo_dir_stripe_count <= 1) + RETURN(0); - slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count * - sizeof(slave_locks->handles[0]); + slave_locks_size = offsetof(typeof(*slave_locks), + ha_handles[lo->ldo_dir_stripe_count]); /* Freed in lod_object_unlock */ OBD_ALLOC(slave_locks, slave_locks_size); - if (slave_locks == NULL) - GOTO(out, rc = -ENOMEM); - slave_locks->count = lo->ldo_dir_stripe_count; + if (!slave_locks) + RETURN(-ENOMEM); + slave_locks->ha_count = lo->ldo_dir_stripe_count; /* striped directory lock */ - for (i = 1; i < lo->ldo_dir_stripe_count; i++) { - struct lustre_handle lockh; - struct ldlm_res_id *res_id; + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + struct lustre_handle lockh; + struct ldlm_res_id *res_id; res_id = &lod_env_info(env)->lti_res_id; fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu), @@ -5088,23 +5349,20 @@ static int lod_object_lock(const struct lu_env *env, einfo->ei_res_id = res_id; LASSERT(lo->ldo_stripe[i] != NULL); - if (likely(dt_object_remote(lo->ldo_stripe[i]))) { + if (dt_object_remote(lo->ldo_stripe[i])) { + set_bit(i, (void *)slave_locks->ha_map); rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo, policy); } else { struct ldlm_namespace *ns = einfo->ei_namespace; ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; ldlm_completion_callback completion = einfo->ei_cb_cp; - __u64 dlmflags = LDLM_FL_ATOMIC_CB; + __u64 dlmflags = LDLM_FL_ATOMIC_CB; if (einfo->ei_mode == LCK_PW || einfo->ei_mode == LCK_EX) dlmflags |= LDLM_FL_COS_INCOMPAT; - /* This only happens if there are mulitple stripes - * on the master MDT, i.e. except stripe0, there are - * other stripes on the Master MDT as well, Only - * happens in the test case right now. */ LASSERT(ns != NULL); rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, einfo->ei_mode, @@ -5113,21 +5371,19 @@ static int lod_object_lock(const struct lu_env *env, NULL, 0, LVB_T_NONE, NULL, &lockh); } - if (rc != 0) - break; - slave_locks->handles[i] = lockh; + if (rc) { + while (i--) + ldlm_lock_decref_and_cancel( + &slave_locks->ha_handles[i], + einfo->ei_mode); + OBD_FREE(slave_locks, slave_locks_size); + RETURN(rc); + } + slave_locks->ha_handles[i] = lockh; } einfo->ei_cbdata = slave_locks; - if (rc != 0 && slave_locks != NULL) { - lod_object_unlock_internal(env, dt, einfo, policy); - OBD_FREE(slave_locks, slave_locks_size); - } - EXIT; -out: - if (rc != 0) - einfo->ei_cbdata = NULL; - RETURN(rc); + RETURN(0); } /** @@ -5169,22 +5425,15 @@ static int lod_declare_instantiate_components(const struct lu_env *env, struct lod_object *lo, struct thandle *th) { struct lod_thread_info *info = lod_env_info(env); - struct ost_pool *inuse = &info->lti_inuse_osts; int i; int rc = 0; ENTRY; LASSERT(info->lti_count < lo->ldo_comp_cnt); - if (info->lti_count > 0) { - /* Prepare inuse array for composite file */ - rc = lod_prepare_inuse(env, lo); - if (rc) - RETURN(rc); - } for (i = 0; i < info->lti_count; i++) { rc = lod_qos_prep_create(env, lo, NULL, th, - info->lti_comp_idx[i], inuse); + info->lti_comp_idx[i]); if (rc) break; } @@ -5210,7 +5459,7 @@ static int lod_declare_update_plain(const struct lu_env *env, int i, rc; ENTRY; - LASSERT(lo->ldo_flr_state == LCM_FL_NOT_FLR); + LASSERT(lo->ldo_flr_state == LCM_FL_NONE); /* * In case the client is passing lovea, which only happens during @@ -5230,7 +5479,6 @@ static int lod_declare_update_plain(const struct lu_env *env, GOTO(out, rc = -EINVAL); } - lod_object_free_striping(env, lo); rc = lod_use_defined_striping(env, lo, buf); if (rc) GOTO(out, rc); @@ -5243,7 +5491,7 @@ static int lod_declare_update_plain(const struct lu_env *env, replay = true; } else { /* non replay path */ - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc) GOTO(out, rc); } @@ -5309,15 +5557,10 @@ static int lod_declare_update_plain(const struct lu_env *env, rc = lod_declare_instantiate_components(env, lo, th); out: if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } -#define lod_foreach_mirror_comp(comp, lo, mirror_idx) \ -for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start]; \ - comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end]; \ - comp++) - static inline int lod_comp_index(struct lod_object *lo, struct lod_layout_component *lod_comp) { @@ -5338,10 +5581,15 @@ static void lod_stale_components(struct lod_object *lo, int primary, /* The writing extent decides which components in the primary * are affected... */ + CDEBUG(D_LAYOUT, "primary mirror %d, "DEXT"\n", primary, PEXT(extent)); lod_foreach_mirror_comp(pri_comp, lo, primary) { if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent)) continue; + CDEBUG(D_LAYOUT, "primary comp %u "DEXT"\n", + lod_comp_index(lo, pri_comp), + PEXT(&pri_comp->llc_extent)); + for (i = 0; i < lo->ldo_mirror_count; i++) { if (i == primary) continue; @@ -5364,27 +5612,63 @@ static void lod_stale_components(struct lod_object *lo, int primary, } } -static int lod_declare_update_rdonly(const struct lu_env *env, - struct lod_object *lo, struct md_layout_change *mlc, - struct thandle *th) +/** + * check an OST's availability + * \param[in] env execution environment + * \param[in] lo lod object + * \param[in] dt dt object + * \param[in] index mirror index + * + * \retval negative if failed + * \retval 1 if \a dt is available + * \retval 0 if \a dt is not available + */ +static inline int lod_check_ost_avail(const struct lu_env *env, + struct lod_object *lo, + struct dt_object *dt, int index) { - struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *layout_attr = &info->lti_layout_attr; - struct lod_layout_component *lod_comp; - struct layout_intent *layout = mlc->mlc_intent; - struct lu_extent extent = layout->li_extent; - unsigned int seq = 0; - int picked; - int i; + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lod_tgt_desc *ost; + __u32 idx; + int type = LU_SEQ_RANGE_OST; int rc; - ENTRY; - LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE); - LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY); - LASSERT(lo->ldo_mirror_count > 0); + rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &idx, &type); + if (rc < 0) { + CERROR("%s: can't locate "DFID":rc = %d\n", + lod2obd(lod)->obd_name, PFID(lu_object_fid(&dt->do_lu)), + rc); + return rc; + } + + ost = OST_TGT(lod, idx); + if (ost->ltd_statfs.os_state & + (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO | + OS_STATE_NOPRECREATE) || + ost->ltd_active == 0) { + CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", + PFID(lod_object_fid(lo)), index, idx, rc); + return 0; + } - CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", - PFID(lod_object_fid(lo)), PEXT(&extent)); + return 1; +} + +/** + * Pick primary mirror for write + * \param[in] env execution environment + * \param[in] lo object + * \param[in] extent write range + */ +static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, + struct lu_extent *extent) +{ + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + unsigned int seq = 0; + struct lod_layout_component *lod_comp; + int i, j, rc; + int picked = -1, second_pick = -1, third_pick = -1; + ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { get_random_bytes(&seq, sizeof(seq)); @@ -5392,46 +5676,221 @@ static int lod_declare_update_rdonly(const struct lu_env *env, } /** - * Pick a mirror as the primary. - * Now it only picks the first mirror, this algo can be - * revised later after knowing the topology of cluster or - * the availability of OSTs. + * Pick a mirror as the primary, and check the availability of OSTs. + * + * This algo can be revised later after knowing the topology of + * cluster. */ - for (picked = -1, i = 0; i < lo->ldo_mirror_count; i++) { + lod_qos_statfs_update(env, lod); + for (i = 0; i < lo->ldo_mirror_count; i++) { + bool ost_avail = true; int index = (i + seq) % lo->ldo_mirror_count; - if (!lo->ldo_mirrors[index].lme_stale) { - picked = index; - break; + if (lo->ldo_mirrors[index].lme_stale) { + CDEBUG(D_LAYOUT, DFID": mirror %d stale\n", + PFID(lod_object_fid(lo)), index); + continue; } - } - if (picked < 0) /* failed to pick a primary */ - RETURN(-ENODATA); - CDEBUG(D_LAYOUT, DFID": picked mirror %u as primary\n", - PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id); + /* 2nd pick is for the primary mirror containing unavail OST */ + if (lo->ldo_mirrors[index].lme_primary && second_pick < 0) + second_pick = index; + + /* 3rd pick is for non-primary mirror containing unavail OST */ + if (second_pick < 0 && third_pick < 0) + third_pick = index; + + /** + * we found a non-primary 1st pick, we'd like to find a + * potential pirmary mirror. + */ + if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary) + continue; + + /* check the availability of OSTs */ + lod_foreach_mirror_comp(lod_comp, lo, index) { + if (!lod_comp_inited(lod_comp) || !lod_comp->llc_stripe) + continue; + + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + struct dt_object *dt = lod_comp->llc_stripe[j]; - /* stale overlapping components from other mirrors */ - lod_stale_components(lo, picked, &extent); + rc = lod_check_ost_avail(env, lo, dt, index); + if (rc < 0) + RETURN(rc); - /* instantiate components for the picked mirror, start from 0 */ - extent = (struct lu_extent) { 0, layout->li_extent.e_end }; - lod_foreach_mirror_comp(lod_comp, lo, picked) { - if (!lu_extent_is_overlapped(&extent, - &lod_comp->llc_extent)) + ost_avail = !!rc; + if (!ost_avail) + break; + } /* for all dt object in one component */ + if (!ost_avail) + break; + } /* for all components in a mirror */ + + /** + * the OSTs where allocated objects locates in the components + * of the mirror are available. + */ + if (!ost_avail) + continue; + + /* this mirror has all OSTs available */ + picked = index; + + /** + * primary with all OSTs are available, this is the perfect + * 1st pick. + */ + if (lo->ldo_mirrors[index].lme_primary) break; + } /* for all mirrors */ + + /* failed to pick a sound mirror, lower our expectation */ + if (picked < 0) + picked = second_pick; + if (picked < 0) + picked = third_pick; + if (picked < 0) + RETURN(-ENODATA); - if (lod_comp_inited(lod_comp)) + RETURN(picked); +} + +/** + * figure out the components should be instantiated for resync. + */ +static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo, + struct lu_extent *extent) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *lod_comp; + unsigned int need_sync = 0; + int i; + + CDEBUG(D_LAYOUT, + DFID": instantiate all stale components in "DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(extent)); + + /** + * instantiate all components within this extent, even non-stale + * components. + */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (!lo->ldo_mirrors[i].lme_stale) continue; - CDEBUG(D_LAYOUT, "instantiate: %u / %u\n", - i, lod_comp_index(lo, lod_comp)); + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lu_extent_is_overlapped(extent, + &lod_comp->llc_extent)) + break; - info->lti_comp_idx[info->lti_count++] = - lod_comp_index(lo, lod_comp); + need_sync++; + + if (lod_comp_inited(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n", + i, lod_comp_index(lo, lod_comp)); + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } } - lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + return need_sync ? 0 : -EALREADY; +} + +static int lod_declare_update_rdonly(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct lu_extent extent = { 0 }; + int rc; + ENTRY; + + LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY); + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || + mlc->mlc_opc == MD_LAYOUT_RESYNC); + LASSERT(lo->ldo_mirror_count > 0); + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *layout = mlc->mlc_intent; + int picked; + + extent = layout->li_extent; + CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(&extent)); + + picked = lod_primary_pick(env, lo, &extent); + if (picked < 0) + RETURN(picked); + + CDEBUG(D_LAYOUT, DFID": picked mirror id %u as primary\n", + PFID(lod_object_fid(lo)), + lo->ldo_mirrors[picked].lme_id); + + if (layout->li_opc == LAYOUT_INTENT_TRUNC) { + /** + * trunc transfers [0, size) in the intent extent, we'd + * stale components overlapping [size, eof). + */ + extent.e_start = extent.e_end; + extent.e_end = OBD_OBJECT_EOF; + } + + /* stale overlapping components from other mirrors */ + lod_stale_components(lo, picked, &extent); + + /* restore truncate intent extent */ + if (layout->li_opc == LAYOUT_INTENT_TRUNC) + extent.e_end = extent.e_start; + + /* instantiate components for the picked mirror, start from 0 */ + extent.e_start = 0; + + lod_foreach_mirror_comp(lod_comp, lo, picked) { + if (!lu_extent_is_overlapped(&extent, + &lod_comp->llc_extent)) + break; + + if (lod_comp_inited(lod_comp)) + continue; + + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + + lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + } else { /* MD_LAYOUT_RESYNC */ + int i; + + /** + * could contain multiple non-stale mirrors, so we need to + * prep uninited all components assuming any non-stale mirror + * could be picked as the primary mirror. + */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lod_comp_inited(lod_comp)) + break; + + if (extent.e_end < lod_comp->llc_extent.e_end) + extent.e_end = + lod_comp->llc_extent.e_end; + } + } + + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + /* change the file state to SYNC_PENDING */ + lo->ldo_flr_state = LCM_FL_SYNC_PENDING; + } /* Reset the layout version once it's becoming too large. * This way it can make sure that the layout version is @@ -5450,13 +5909,15 @@ static int lod_declare_update_rdonly(const struct lu_env *env, layout_attr->la_valid = LA_LAYOUT_VERSION; layout_attr->la_layout_version = 0; /* set current version */ + if (mlc->mlc_opc == MD_LAYOUT_RESYNC) + layout_attr->la_layout_version = LU_LAYOUT_RESYNC; rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); if (rc) GOTO(out, rc); out: if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } @@ -5517,12 +5978,25 @@ static int lod_declare_update_write_pending(const struct lu_env *env, CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); + if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) { + /** + * trunc transfers [0, size) in the intent extent, we'd + * stale components overlapping [size, eof). + */ + extent.e_start = extent.e_end; + extent.e_end = OBD_OBJECT_EOF; + } /* 1. stale overlapping components */ lod_stale_components(lo, primary, &extent); /* 2. find out the components need instantiating. * instantiate [0, mlc->mlc_intent->e_end) */ + + /* restore truncate intent extent */ + if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) + extent.e_end = extent.e_start; extent.e_start = 0; + lod_foreach_mirror_comp(lod_comp, lo, primary) { if (!lu_extent_is_overlapped(&extent, &lod_comp->llc_extent)) @@ -5537,9 +6011,6 @@ static int lod_declare_update_write_pending(const struct lu_env *env, lod_comp_index(lo, lod_comp); } } else { /* MD_LAYOUT_RESYNC */ - /* figure out the components that have been instantiated in - * in primary to decide what components should be instantiated - * in stale mirrors */ lod_foreach_mirror_comp(lod_comp, lo, primary) { if (!lod_comp_inited(lod_comp)) break; @@ -5547,36 +6018,9 @@ static int lod_declare_update_write_pending(const struct lu_env *env, extent.e_end = lod_comp->llc_extent.e_end; } - CDEBUG(D_LAYOUT, - DFID": instantiate all stale components in "DEXT"\n", - PFID(lod_object_fid(lo)), PEXT(&extent)); - - /* 1. instantiate all components within this extent, even - * non-stale components so that it won't need to instantiate - * those components for mirror truncate later. */ - for (i = 0; i < lo->ldo_mirror_count; i++) { - if (primary == i) - continue; - - LASSERTF(lo->ldo_mirrors[i].lme_stale, - "both %d and %d are primary\n", i, primary); - - lod_foreach_mirror_comp(lod_comp, lo, i) { - if (!lu_extent_is_overlapped(&extent, - &lod_comp->llc_extent)) - break; - - if (lod_comp_inited(lod_comp)) - continue; - - CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n", - i, lod_comp_index(lo, lod_comp)); - - info->lti_comp_idx[info->lti_count++] = - lod_comp_index(lo, lod_comp); - } - } - + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); /* change the file state to SYNC_PENDING */ lo->ldo_flr_state = LCM_FL_SYNC_PENDING; } @@ -5601,7 +6045,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, lod_obj_inc_layout_gen(lo); out: if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } @@ -5666,8 +6110,8 @@ static int lod_declare_update_sync_pending(const struct lu_env *env, GOTO(out, rc = -EINVAL); } - if (!sync_components || !resync_components) { - CDEBUG(D_LAYOUT, DFID": no mirror in sync or resync\n", + if (!sync_components || (mlc->mlc_resync_count && !resync_components)) { + CDEBUG(D_LAYOUT, DFID": no mirror in sync\n", PFID(lod_object_fid(lo))); /* tend to return an error code here to prevent @@ -5689,7 +6133,7 @@ static int lod_declare_update_sync_pending(const struct lu_env *env, out: if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } @@ -5706,8 +6150,7 @@ static int lod_declare_layout_change(const struct lu_env *env, dt_object_remote(dt_object_child(dt))) RETURN(-EINVAL); - lod_write_lock(env, dt, 0); - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc) GOTO(out, rc); @@ -5718,7 +6161,7 @@ static int lod_declare_layout_change(const struct lu_env *env, GOTO(out, rc); switch (lo->ldo_flr_state) { - case LCM_FL_NOT_FLR: + case LCM_FL_NONE: rc = lod_declare_update_plain(env, lo, mlc->mlc_intent, &mlc->mlc_buf, th); break; @@ -5736,7 +6179,6 @@ static int lod_declare_layout_change(const struct lu_env *env, break; } out: - dt_write_unlock(env, dt); RETURN(rc); } @@ -5955,7 +6397,7 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, * \param[in] env execution environment * \param[in] lo object */ -void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) +void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) { struct lod_layout_component *lod_comp; int i, j; @@ -5992,6 +6434,10 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) sizeof(struct dt_object *) * lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; + OBD_FREE(lod_comp->llc_ost_indices, + sizeof(__u32) * + lod_comp->llc_stripes_allocated); + lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } lod_free_comp_entries(lo); @@ -5999,6 +6445,13 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) } } +void lod_striping_free(const struct lu_env *env, struct lod_object *lo) +{ + mutex_lock(&lo->ldo_layout_mutex); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); +} + /** * Implementation of lu_object_operations::loo_object_free. * @@ -6010,7 +6463,7 @@ static void lod_object_free(const struct lu_env *env, struct lu_object *o) struct lod_object *lo = lu2lod_obj(o); /* release all underlying object pinned */ - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); lu_object_fini(o); OBD_SLAB_FREE_PTR(lo, lod_object_kmem); }