* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* lustre/lod/lod_object.c
#define DEBUG_SUBSYSTEM S_MDS
+#include <linux/random.h>
+
#include <obd.h>
#include <obd_class.h>
#include <obd_support.h>
LASSERT(next->do_ops);
LASSERT(next->do_ops->do_index_try);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
return dt_attr_get(env, dt_object_child(dt), attr);
}
+static inline void lod_adjust_stripe_info(struct lod_layout_component *comp,
+ struct lov_desc *desc)
+{
+ if (comp->llc_pattern != LOV_PATTERN_MDT) {
+ if (!comp->llc_stripe_count)
+ comp->llc_stripe_count =
+ desc->ld_default_stripe_count;
+ }
+ if (comp->llc_stripe_size <= 0)
+ comp->llc_stripe_size = desc->ld_default_stripe_size;
+}
+
int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
struct thandle *th,
struct lod_obj_stripe_cb_data *data)
data->locd_comp_skip_cb(env, lo, i, data))
continue;
+ if (data->locd_comp_cb) {
+ rc = data->locd_comp_cb(env, lo, i, data);
+ if (rc)
+ RETURN(rc);
+ }
+
+ /* could used just to do sth about component, not each
+ * stripes
+ */
+ if (!data->locd_stripe_cb)
+ continue;
+
LASSERT(lod_comp->llc_stripe_count > 0);
for (j = 0; j < lod_comp->llc_stripe_count; j++) {
struct dt_object *dt = lod_comp->llc_stripe[j];
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
* the in-memory striping information has been freed in lod_xattr_set()
* due to layout change. It has to load stripe here again. It only
* changes flags of layout so declare_attr_set() is still accurate */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
int rc = 0;
ENTRY;
+ LASSERT(mutex_is_locked(&lo->ldo_layout_mutex));
+
if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
RETURN(0);
lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
if (rc != 0)
- lod_object_free_striping(env, lo);
+ lod_striping_free_nolock(env, lo);
RETURN(rc);
}
int rc = 0;
__u32 i;
__u32 j;
+ bool is_specific = false;
ENTRY;
/* The lum has been verifed in lod_verify_md_striping */
- LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
+ LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC ||
+ le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC);
LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
stripe_count = le32_to_cpu(lum->lum_stripe_count);
/* Start index must be the master MDT */
master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
idx_array[0] = master_index;
+ if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) {
+ is_specific = true;
+ for (i = 1; i < stripe_count; i++)
+ idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds);
+ }
+
for (i = 0; i < stripe_count; i++) {
struct lod_tgt_desc *tgt = NULL;
struct dt_object *dto;
CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n",
idx, lod->lod_remote_mdt_count + 1, i);
- if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
+ if (likely(!is_specific &&
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
/* check whether the idx already exists
* in current allocated array */
for (k = 0; k < i; k++) {
idx, i, PFID(&fid));
idx_array[i] = idx;
/* Set the start index for next stripe allocation */
- if (i < stripe_count - 1)
+ if (!is_specific && i < stripe_count - 1) {
+ /*
+ * for large dir test, put all other slaves on one
+ * remote MDT, otherwise we may save too many local
+ * slave locks which will exceed RS_MAX_LOCKS.
+ */
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)))
+ idx = master_index;
idx_array[i + 1] = (idx + 1) %
(lod->lod_remote_mdt_count + 1);
+ }
/* tgt_dt and fid must be ready after search avaible OSP
* in the above loop */
LASSERT(tgt_dt != NULL);
stripe[i] = dto;
}
- lo->ldo_dir_stripe_loaded = 1;
lo->ldo_dir_striped = 1;
lo->ldo_stripe = stripe;
lo->ldo_dir_stripe_count = i;
lo->ldo_dir_stripes_allocated = stripe_count;
+ smp_mb();
+ lo->ldo_dir_stripe_loaded = 1;
if (lo->ldo_dir_stripe_count == 0)
GOTO(out_put, rc = -ENOSPC);
le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
(int)le32_to_cpu(lum->lum_stripe_offset));
- if (le32_to_cpu(lum->lum_stripe_count) == 0)
+ if (lo->ldo_dir_stripe_count == 0)
GOTO(out, rc = 0);
/* prepare dir striped objects */
if (rc != 0) {
/* failed to create striping, let's reset
* config so that others don't get confused */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
GOTO(out, rc);
}
out:
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
struct lod_obj_stripe_cb_data *data)
{
struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx];
struct filter_fid *ff = &info->lti_ff;
struct lu_buf *buf = &info->lti_buf;
int rc;
buf->lb_buf = ff;
buf->lb_len = sizeof(*ff);
rc = dt_xattr_get(env, dt, buf, XATTR_NAME_FID);
- if (rc == -ENODATA)
- return 0;
-
- if (rc < 0)
+ if (rc < 0) {
+ if (rc == -ENODATA)
+ return 0;
return rc;
+ }
+
+ filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
+ if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) &&
+ ff->ff_layout.ol_comp_id == comp->llc_id)
+ return 0;
+ /* rewrite filter_fid */
+ memset(ff, 0, sizeof(*ff));
ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu);
ff->ff_parent.f_ver = stripe_idx;
- fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent);
+ ff->ff_layout.ol_stripe_size = comp->llc_stripe_size;
+ ff->ff_layout.ol_stripe_count = comp->llc_stripe_count;
+ ff->ff_layout.ol_comp_id = comp->llc_id;
+ ff->ff_layout.ol_comp_start = comp->llc_extent.e_start;
+ ff->ff_layout.ol_comp_end = comp->llc_extent.e_end;
+ filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
+
if (data->locd_declare)
rc = lod_sub_declare_xattr_set(env, dt, buf, XATTR_NAME_FID,
LU_XATTR_REPLACE, th);
LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
LASSERT(lo->ldo_is_composite);
- if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ if (lo->ldo_flr_state != LCM_FL_NONE)
RETURN(-EBUSY);
rc = lod_verify_striping(d, lo, buf, false);
lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags;
lod_comp->llc_stripe_count = v1->lmm_stripe_count;
- if (!lod_comp->llc_stripe_count ||
- lod_comp->llc_stripe_count == (__u16)-1)
- lod_comp->llc_stripe_count =
- desc->ld_default_stripe_count;
lod_comp->llc_stripe_size = v1->lmm_stripe_size;
- if (!lod_comp->llc_stripe_size)
- lod_comp->llc_stripe_size =
- desc->ld_default_stripe_size;
+ lod_adjust_stripe_info(lod_comp, desc);
if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
v3 = (struct lov_user_md_v3 *) v1;
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_object *lo = lod_dt_obj(dt);
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
- __u32 magic, id;
+ __u32 magic;
int i, j, rc;
bool changed = false;
ENTRY;
}
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
- id = comp_v1->lcm_entries[i].lcme_id;
+ __u32 id = comp_v1->lcm_entries[i].lcme_id;
+ __u32 flags = comp_v1->lcm_entries[i].lcme_flags;
+
+ if (flags & LCME_FL_INIT) {
+ if (changed)
+ lod_striping_free(env, lo);
+ RETURN(-EINVAL);
+ }
for (j = 0; j < lo->ldo_comp_cnt; j++) {
lod_comp = &lo->ldo_comp_entries[j];
- if (id == lod_comp->llc_id || id == LCME_ID_ALL) {
- lod_comp->llc_flags =
- comp_v1->lcm_entries[i].lcme_flags;
- changed = true;
+ if (id != lod_comp->llc_id)
+ continue;
+
+ if (flags & LCME_FL_NEG) {
+ flags &= ~LCME_FL_NEG;
+ lod_comp->llc_flags &= ~flags;
+ } else {
+ lod_comp->llc_flags |= flags;
}
+ changed = true;
}
}
lod_obj_inc_layout_gen(lo);
info->lti_buf.lb_len = lod_comp_md_size(lo, false);
- rc = lod_sub_declare_xattr_set(env, dt, &info->lti_buf,
- XATTR_NAME_LOV, 0, th);
+ rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), &info->lti_buf,
+ XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
RETURN(rc);
}
LASSERT(lo->ldo_is_composite);
- if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ if (lo->ldo_flr_state != LCM_FL_NONE)
RETURN(-EBUSY);
magic = comp_v1->lcm_magic;
RETURN(-EINVAL);
}
+ if (id == LCME_ID_INVAL && !flags) {
+ CDEBUG(D_LAYOUT, "%s: no id or flags specified.\n",
+ lod2obd(d)->obd_name);
+ RETURN(-EINVAL);
+ }
+
if (flags & LCME_FL_NEG) {
neg_flags = flags & ~LCME_FL_NEG;
flags = 0;
{
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_object *lo = lod_dt_obj(dt);
- struct dt_object *next = dt_object_child(&lo->ldo_obj);
char *op;
int rc, len = strlen(XATTR_LUSTRE_LOV);
ENTRY;
}
len++;
- dt_write_lock(env, next, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(unlock, rc);
}
unlock:
if (rc)
- lod_object_free_striping(env, lo);
- dt_write_unlock(env, next);
+ lod_striping_free(env, lo);
RETURN(rc);
}
lcm->lcm_size = cpu_to_le32(size);
lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu(
lmm_save->lmm_layout_gen));
- lcm->lcm_flags = cpu_to_le16(LCM_FL_NOT_FLR);
+ lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE);
lcm->lcm_entry_count = cpu_to_le16(1);
lcm->lcm_mirror_count = 0;
lcm->lcm_size = cpu_to_le32(size);
lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count);
lcm->lcm_mirror_count = cpu_to_le16(mirror_count);
- if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NOT_FLR)
+ if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE)
lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY);
- LASSERT(dt_write_locked(env, dt_object_child(dt)));
- lod_object_free_striping(env, lo);
- rc = lod_parse_striping(env, lo, buf);
+ rc = lod_striping_reload(env, lo, buf);
if (rc)
GOTO(out, rc);
}
/**
+ * Split layouts, just set the LOVEA with the layout from mbuf.
+ */
+static int lod_declare_layout_split(const struct lu_env *env,
+ struct dt_object *dt, const struct lu_buf *mbuf,
+ struct thandle *th)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lov_comp_md_v1 *lcm = mbuf->lb_buf;
+ int rc;
+ ENTRY;
+
+ lod_obj_inc_layout_gen(lo);
+ lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen);
+
+ rc = lod_striping_reload(env, lo, mbuf);
+ if (rc)
+ RETURN(rc);
+
+ rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf,
+ XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
+ RETURN(rc);
+}
+
+/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
* \see dt_object_operations::do_declare_xattr_set() in the API description
mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
if ((S_ISREG(mode) || mode == 0) &&
- !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE)) &&
+ !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) &&
(strcmp(name, XATTR_NAME_LOV) == 0 ||
strcmp(name, XATTR_LUSTRE_LOV) == 0)) {
/*
LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
strcmp(name, XATTR_LUSTRE_LOV) == 0);
rc = lod_declare_layout_merge(env, dt, buf, th);
+ } else if (fl & LU_XATTR_SPLIT) {
+ LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
+ strcmp(name, XATTR_LUSTRE_LOV) == 0);
+ rc = lod_declare_layout_split(env, dt, buf, th);
} else if (S_ISREG(mode) &&
strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 &&
strncmp(name, XATTR_LUSTRE_LOV,
lum = buf->lb_buf;
switch (lum->lmm_magic) {
+ case LOV_USER_MAGIC_SPECIFIC:
case LOV_USER_MAGIC_V3:
v3 = buf->lb_buf;
if (v3->lmm_pool_name[0] != '\0')
* \param[in] env execution environment
* \param[in] dt object
* \param[in] attr attributes the stripes will be created with
+ * \param[in] lmu lmv_user_md if MDT indices are specified
* \param[in] dof format of stripes (see OSD API description)
* \param[in] th transaction handle
* \param[in] declare where to call "declare" or "execute" methods
static int lod_dir_striping_create_internal(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
+ const struct lu_buf *lmu,
struct dt_object_format *dof,
struct thandle *th,
bool declare)
if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count,
lo->ldo_dir_stripe_offset)) {
- struct lmv_user_md_v1 *v1 = info->lti_ea_store;
- int stripe_count = lo->ldo_dir_stripe_count;
+ if (!lmu) {
+ struct lmv_user_md_v1 *v1 = info->lti_ea_store;
+ int stripe_count = lo->ldo_dir_stripe_count;
- if (info->lti_ea_store_size < sizeof(*v1)) {
- rc = lod_ea_store_resize(info, sizeof(*v1));
- if (rc != 0)
- RETURN(rc);
- v1 = info->lti_ea_store;
- }
+ if (info->lti_ea_store_size < sizeof(*v1)) {
+ rc = lod_ea_store_resize(info, sizeof(*v1));
+ if (rc != 0)
+ RETURN(rc);
+ v1 = info->lti_ea_store;
+ }
- memset(v1, 0, sizeof(*v1));
- v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
- v1->lum_stripe_count = cpu_to_le32(stripe_count);
- v1->lum_stripe_offset =
- cpu_to_le32(lo->ldo_dir_stripe_offset);
+ memset(v1, 0, sizeof(*v1));
+ v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+ v1->lum_stripe_count = cpu_to_le32(stripe_count);
+ v1->lum_stripe_offset =
+ cpu_to_le32(lo->ldo_dir_stripe_offset);
- info->lti_buf.lb_buf = v1;
- info->lti_buf.lb_len = sizeof(*v1);
+ info->lti_buf.lb_buf = v1;
+ info->lti_buf.lb_len = sizeof(*v1);
+ lmu = &info->lti_buf;
+ }
if (declare)
- rc = lod_declare_xattr_set_lmv(env, dt, attr,
- &info->lti_buf, dof, th);
+ rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof,
+ th);
else
- rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
- XATTR_NAME_LMV, 0, th);
+ rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0,
+ th);
if (rc != 0)
RETURN(rc);
}
static int lod_declare_dir_striping_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
+ struct lu_buf *lmu,
struct dt_object_format *dof,
struct thandle *th)
{
- return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
+ return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th,
+ true);
}
static int lod_dir_striping_create(const struct lu_env *env,
struct dt_object_format *dof,
struct thandle *th)
{
- return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
+ return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th,
+ false);
}
/**
LASSERT(lo);
if (lo->ldo_comp_cnt == 0) {
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
RETURN(rc);
}
OBD_FREE(lod_comp->llc_stripe, sizeof(struct dt_object *) *
lod_comp->llc_stripes_allocated);
lod_comp->llc_stripe = NULL;
+ OBD_FREE(lod_comp->llc_ost_indices,
+ sizeof(__u32) * lod_comp->llc_stripes_allocated);
+ lod_comp->llc_ost_indices = NULL;
lod_comp->llc_stripes_allocated = 0;
lod_obj_set_pool(lo, i, NULL);
if (lod_comp->llc_ostlist.op_array) {
EXIT;
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
return rc;
}
+
+static int lod_get_default_lov_striping(const struct lu_env *env,
+ struct lod_object *lo,
+ struct lod_default_striping *lds);
/**
* Implementation of dt_object_operations::do_xattr_set.
*
if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LOV) == 0) {
- /* default LOVEA */
- rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th);
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_default_striping *lds = &info->lti_def_striping;
+ struct lov_user_md_v1 *v1 = buf->lb_buf;
+ char pool[LOV_MAXPOOLNAME + 1];
+ bool is_del;
+
+ /* get existing striping config */
+ rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds);
+ if (rc)
+ RETURN(rc);
+
+ memset(pool, 0, sizeof(pool));
+ if (lds->lds_def_striping_set == 1)
+ lod_layout_get_pool(lds->lds_def_comp_entries,
+ lds->lds_def_comp_cnt, pool,
+ sizeof(pool));
+
+ is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size,
+ v1->lmm_stripe_count,
+ v1->lmm_stripe_offset,
+ NULL);
+
+ /* Retain the pool name if it is not given */
+ if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' &&
+ !is_del) {
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lov_user_md_v3 *v3 = info->lti_ea_store;
+
+ memset(v3, 0, sizeof(*v3));
+ v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+ v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern);
+ v3->lmm_stripe_count =
+ cpu_to_le32(v1->lmm_stripe_count);
+ v3->lmm_stripe_offset =
+ cpu_to_le32(v1->lmm_stripe_offset);
+ v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size);
+
+ strlcpy(v3->lmm_pool_name, pool,
+ sizeof(v3->lmm_pool_name));
+
+ info->lti_buf.lb_buf = v3;
+ info->lti_buf.lb_len = sizeof(*v3);
+ rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
+ name, fl, th);
+ } else {
+ rc = lod_xattr_set_lov_on_dir(env, dt, buf, name,
+ fl, th);
+ }
+
+ if (lds->lds_def_striping_set == 1 &&
+ lds->lds_def_comp_entries != NULL)
+ lod_free_def_comp_entries(lds);
+
RETURN(rc);
} else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
* defines striping, then create() does the work */
if (fl & LU_XATTR_REPLACE) {
/* free stripes, then update disk */
- lod_object_free_striping(env, lod_dt_obj(dt));
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
} else if (dt_object_remote(dt)) {
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
ENTRY;
if (!strcmp(name, XATTR_NAME_LOV))
- lod_object_free_striping(env, lod_dt_obj(dt));
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_del(env, next, name, th);
if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
}
+/**
+ * Copy OST list from layout provided by user.
+ *
+ * \param[in] lod_comp layout_component to be filled
+ * \param[in] v3 LOV EA V3 user data
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp,
+ struct lov_user_md_v3 *v3)
+{
+ int j;
+
+ ENTRY;
+
+ if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT)
+ v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx;
+
+ if (lod_comp->llc_ostlist.op_array) {
+ if (lod_comp->llc_ostlist.op_count ==
+ v3->lmm_stripe_count)
+ goto skip;
+ OBD_FREE(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ }
+
+ /* copy ost list from lmm */
+ lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count;
+ lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32);
+ OBD_ALLOC(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ if (!lod_comp->llc_ostlist.op_array)
+ RETURN(-ENOMEM);
+skip:
+ for (j = 0; j < v3->lmm_stripe_count; j++) {
+ lod_comp->llc_ostlist.op_array[j] =
+ v3->lmm_objects[j].l_ost_idx;
+ }
+
+ RETURN(0);
+}
+
/**
* Get default striping.
} else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
v3 = (struct lov_user_md_v3 *)v1;
lustre_swab_lov_user_md_v3(v3);
+ } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) {
+ v3 = (struct lov_user_md_v3 *)v1;
+ lustre_swab_lov_user_md_v3(v3);
+ lustre_swab_lov_user_md_objects(v3->lmm_objects,
+ v3->lmm_stripe_count);
} else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
comp_v1 = (struct lov_comp_md_v1 *)v1;
lustre_swab_lov_comp_md_v1(comp_v1);
}
if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 &&
- v1->lmm_magic != LOV_MAGIC_COMP_V1)
+ v1->lmm_magic != LOV_MAGIC_COMP_V1 &&
+ v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC)
RETURN(-ENOTSUPP);
if (v1->lmm_magic == LOV_MAGIC_COMP_V1) {
pool = v3->lmm_pool_name;
}
lod_set_def_pool(lds, i, pool);
+ if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
+ v3 = (struct lov_user_md_v3 *)v1;
+ rc = lod_comp_copy_ost_lists(lod_comp, v3);
+ if (rc)
+ RETURN(rc);
+ }
}
lds->lds_def_striping_set = 1;
lod_obj_set_pool(lo, i, def_comp->llc_pool);
}
+ /* copy ost list */
+ if (def_comp->llc_ostlist.op_array) {
+ OBD_ALLOC(obj_comp->llc_ostlist.op_array,
+ obj_comp->llc_ostlist.op_size);
+ if (!obj_comp->llc_ostlist.op_array)
+ return;
+ memcpy(obj_comp->llc_ostlist.op_array,
+ def_comp->llc_ostlist.op_array,
+ obj_comp->llc_ostlist.op_size);
+ }
+
/*
* Don't initialize these fields for plain layout
* (v1/v3) here, they are inherited in the order of
if (!lo->ldo_is_composite)
continue;
- if (obj_comp->llc_stripe_count <= 0 &&
- obj_comp->llc_pattern != LOV_PATTERN_MDT)
- obj_comp->llc_stripe_count =
- desc->ld_default_stripe_count;
- if (obj_comp->llc_stripe_size <= 0)
- obj_comp->llc_stripe_size =
- desc->ld_default_stripe_size;
+ lod_adjust_stripe_info(obj_comp, desc);
}
} else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) {
if (lo->ldo_dir_stripe_count == 0)
* stripe count and try to create dir by default stripe.
*/
if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 &&
- le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC) {
+ (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC ||
+ le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) {
lc->ldo_dir_stripe_count =
le32_to_cpu(lum1->lum_stripe_count);
lc->ldo_dir_stripe_offset =
LASSERT(!lc->ldo_is_composite);
lod_comp = &lc->ldo_comp_entries[0];
desc = &d->lod_desc;
- if (lod_comp->llc_stripe_count <= 0)
- lod_comp->llc_stripe_count =
- desc->ld_default_stripe_count;
- if (lod_comp->llc_stripe_size <= 0)
- lod_comp->llc_stripe_size =
- desc->ld_default_stripe_size;
+ lod_adjust_stripe_info(lod_comp, desc);
}
EXIT;
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
NULL, th);
} else if (dof->dof_type == DFT_DIR) {
struct seq_server_site *ss;
+ struct lu_buf buf = { NULL };
+ struct lu_buf *lmu = NULL;
ss = lu_site2seq(dt->do_lu.lo_dev->ld_site);
else
GOTO(out, rc = -EINVAL);
}
+ } else if (hint && hint->dah_eadata) {
+ lmu = &buf;
+ lmu->lb_buf = (void *)hint->dah_eadata;
+ lmu->lb_len = hint->dah_eadata_len;
}
- rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
+ rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof,
+ th);
}
out:
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
RETURN(0);
out:
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
}
/**
- * Release LDLM locks on the stripes of a striped directory.
- *
- * Iterates over all the locks taken on the stripe objects and
- * cancel them.
- *
- * \param[in] env execution environment
- * \param[in] dt striped object
- * \param[in] einfo lock description
- * \param[in] policy data describing requested lock
- *
- * \retval 0 on success
- * \retval negative if failed
- */
-static int lod_object_unlock_internal(const struct lu_env *env,
- struct dt_object *dt,
- struct ldlm_enqueue_info *einfo,
- union ldlm_policy_data *policy)
-{
- struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
- int rc = 0;
- int i;
- ENTRY;
-
- if (slave_locks == NULL)
- RETURN(0);
-
- for (i = 1; i < slave_locks->count; i++) {
- if (lustre_handle_is_used(&slave_locks->handles[i]))
- ldlm_lock_decref_and_cancel(&slave_locks->handles[i],
- einfo->ei_mode);
- }
-
- RETURN(rc);
-}
-
-/**
* Implementation of dt_object_operations::do_object_unlock.
*
* Used to release LDLM lock(s).
LASSERT(!dt_object_remote(dt_object_child(dt)));
/* locks were unlocked in MDT layer */
- for (i = 1; i < slave_locks->count; i++) {
- LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
+ for (i = 0; i < slave_locks->ha_count; i++)
+ LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i]));
+
+ /*
+ * NB, ha_count may not equal to ldo_dir_stripe_count, because dir
+ * layout may change, e.g., shrink dir layout after migration.
+ */
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
dt_invalidate(env, lo->ldo_stripe[i]);
- }
- slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[slave_locks->ha_count]);
OBD_FREE(slave_locks, slave_locks_size);
einfo->ei_cbdata = NULL;
struct ldlm_enqueue_info *einfo,
union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- int rc = 0;
- int i;
- int slave_locks_size;
+ struct lod_object *lo = lod_dt_obj(dt);
+ int slave_locks_size;
struct lustre_handle_array *slave_locks = NULL;
+ int i;
+ int rc;
ENTRY;
/* remote object lock */
}
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
- GOTO(out, rc = -ENOTDIR);
+ RETURN(-ENOTDIR);
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
- GOTO(out, rc);
+ RETURN(rc);
/* No stripes */
- if (lo->ldo_dir_stripe_count <= 1) {
- /*
- * NB, ei_cbdata stores pointer to slave locks, if no locks
- * taken, make sure it's set to NULL, otherwise MDT will try to
- * unlock them.
- */
- einfo->ei_cbdata = NULL;
- GOTO(out, rc = 0);
- }
+ if (lo->ldo_dir_stripe_count <= 1)
+ RETURN(0);
- slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[lo->ldo_dir_stripe_count]);
/* Freed in lod_object_unlock */
OBD_ALLOC(slave_locks, slave_locks_size);
- if (slave_locks == NULL)
- GOTO(out, rc = -ENOMEM);
- slave_locks->count = lo->ldo_dir_stripe_count;
+ if (!slave_locks)
+ RETURN(-ENOMEM);
+ slave_locks->ha_count = lo->ldo_dir_stripe_count;
/* striped directory lock */
- for (i = 1; i < lo->ldo_dir_stripe_count; i++) {
- struct lustre_handle lockh;
- struct ldlm_res_id *res_id;
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ struct lustre_handle lockh;
+ struct ldlm_res_id *res_id;
res_id = &lod_env_info(env)->lti_res_id;
fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
einfo->ei_res_id = res_id;
LASSERT(lo->ldo_stripe[i] != NULL);
- if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+ if (dt_object_remote(lo->ldo_stripe[i])) {
+ set_bit(i, (void *)slave_locks->ha_map);
rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
einfo, policy);
} else {
struct ldlm_namespace *ns = einfo->ei_namespace;
ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
ldlm_completion_callback completion = einfo->ei_cb_cp;
- __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
if (einfo->ei_mode == LCK_PW ||
einfo->ei_mode == LCK_EX)
dlmflags |= LDLM_FL_COS_INCOMPAT;
- /* This only happens if there are mulitple stripes
- * on the master MDT, i.e. except stripe0, there are
- * other stripes on the Master MDT as well, Only
- * happens in the test case right now. */
LASSERT(ns != NULL);
rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
policy, einfo->ei_mode,
NULL, 0, LVB_T_NONE,
NULL, &lockh);
}
- if (rc != 0)
- break;
- slave_locks->handles[i] = lockh;
+ if (rc) {
+ while (i--)
+ ldlm_lock_decref_and_cancel(
+ &slave_locks->ha_handles[i],
+ einfo->ei_mode);
+ OBD_FREE(slave_locks, slave_locks_size);
+ RETURN(rc);
+ }
+ slave_locks->ha_handles[i] = lockh;
}
einfo->ei_cbdata = slave_locks;
- if (rc != 0 && slave_locks != NULL) {
- lod_object_unlock_internal(env, dt, einfo, policy);
- OBD_FREE(slave_locks, slave_locks_size);
- }
- EXIT;
-out:
- if (rc != 0)
- einfo->ei_cbdata = NULL;
- RETURN(rc);
+ RETURN(0);
}
/**
struct lod_object *lo, struct thandle *th)
{
struct lod_thread_info *info = lod_env_info(env);
- struct ost_pool *inuse = &info->lti_inuse_osts;
int i;
int rc = 0;
ENTRY;
LASSERT(info->lti_count < lo->ldo_comp_cnt);
- if (info->lti_count > 0) {
- /* Prepare inuse array for composite file */
- rc = lod_prepare_inuse(env, lo);
- if (rc)
- RETURN(rc);
- }
for (i = 0; i < info->lti_count; i++) {
rc = lod_qos_prep_create(env, lo, NULL, th,
- info->lti_comp_idx[i], inuse);
+ info->lti_comp_idx[i]);
if (rc)
break;
}
int i, rc;
ENTRY;
- LASSERT(lo->ldo_flr_state == LCM_FL_NOT_FLR);
+ LASSERT(lo->ldo_flr_state == LCM_FL_NONE);
/*
* In case the client is passing lovea, which only happens during
GOTO(out, rc = -EINVAL);
}
- lod_object_free_striping(env, lo);
rc = lod_use_defined_striping(env, lo, buf);
if (rc)
GOTO(out, rc);
replay = true;
} else {
/* non replay path */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
}
- if (layout->li_opc == LAYOUT_INTENT_TRUNC) {
- /**
- * trunc transfers [size, eof) in the intent extent, while
- * we'd instantiated components covers [0, size).
- */
- layout->li_extent.e_end = layout->li_extent.e_start;
- layout->li_extent.e_start = 0;
- }
-
/* Make sure defined layout covers the requested write range. */
lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1];
if (lo->ldo_comp_cnt > 1 &&
rc = lod_declare_instantiate_components(env, lo, th);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
-#define lod_foreach_mirror_comp(comp, lo, mirror_idx) \
-for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start]; \
- comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end]; \
- comp++)
-
static inline int lod_comp_index(struct lod_object *lo,
struct lod_layout_component *lod_comp)
{
}
}
-static int lod_declare_update_rdonly(const struct lu_env *env,
- struct lod_object *lo, struct md_layout_change *mlc,
- struct thandle *th)
+/**
+ * check an OST's availability
+ * \param[in] env execution environment
+ * \param[in] lo lod object
+ * \param[in] dt dt object
+ * \param[in] index mirror index
+ *
+ * \retval negative if failed
+ * \retval 1 if \a dt is available
+ * \retval 0 if \a dt is not available
+ */
+static inline int lod_check_ost_avail(const struct lu_env *env,
+ struct lod_object *lo,
+ struct dt_object *dt, int index)
{
- struct lod_thread_info *info = lod_env_info(env);
- struct lu_attr *layout_attr = &info->lti_layout_attr;
- struct lod_layout_component *lod_comp;
- struct layout_intent *layout = mlc->mlc_intent;
- struct lu_extent extent = layout->li_extent;
- unsigned int seq = 0;
- int picked;
- int i;
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ struct lod_tgt_desc *ost;
+ __u32 idx;
+ int type = LU_SEQ_RANGE_OST;
int rc;
- ENTRY;
- LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
- LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
- LASSERT(lo->ldo_mirror_count > 0);
+ rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &idx, &type);
+ if (rc < 0) {
+ CERROR("%s: can't locate "DFID":rc = %d\n",
+ lod2obd(lod)->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+ rc);
+ return rc;
+ }
- CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
- PFID(lod_object_fid(lo)), PEXT(&extent));
+ ost = OST_TGT(lod, idx);
+ if (ost->ltd_statfs.os_state &
+ (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO |
+ OS_STATE_NOPRECREATE) ||
+ ost->ltd_active == 0) {
+ CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n",
+ PFID(lod_object_fid(lo)), index, idx, rc);
+ return 0;
+ }
+
+ return 1;
+}
+
+/**
+ * Pick primary mirror for write
+ * \param[in] env execution environment
+ * \param[in] lo object
+ * \param[in] extent write range
+ */
+static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo,
+ struct lu_extent *extent)
+{
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ unsigned int seq = 0;
+ struct lod_layout_component *lod_comp;
+ int i, j, rc;
+ int picked = -1, second_pick = -1, third_pick = -1;
+ ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) {
get_random_bytes(&seq, sizeof(seq));
}
/**
- * Pick a mirror as the primary.
- * Now it only picks the first mirror, this algo can be
- * revised later after knowing the topology of cluster or
- * the availability of OSTs.
+ * Pick a mirror as the primary, and check the availability of OSTs.
+ *
+ * This algo can be revised later after knowing the topology of
+ * cluster.
*/
- for (picked = -1, i = 0; i < lo->ldo_mirror_count; i++) {
+ lod_qos_statfs_update(env, lod);
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ bool ost_avail = true;
int index = (i + seq) % lo->ldo_mirror_count;
- if (!lo->ldo_mirrors[index].lme_stale) {
- picked = index;
- break;
+ if (lo->ldo_mirrors[index].lme_stale) {
+ CDEBUG(D_LAYOUT, DFID": mirror %d stale\n",
+ PFID(lod_object_fid(lo)), index);
+ continue;
}
- }
- if (picked < 0) /* failed to pick a primary */
- RETURN(-ENODATA);
- CDEBUG(D_LAYOUT, DFID": picked mirror %u as primary\n",
- PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id);
+ /* 2nd pick is for the primary mirror containing unavail OST */
+ if (lo->ldo_mirrors[index].lme_primary && second_pick < 0)
+ second_pick = index;
- /* stale overlapping components from other mirrors */
- lod_stale_components(lo, picked, &extent);
+ /* 3rd pick is for non-primary mirror containing unavail OST */
+ if (second_pick < 0 && third_pick < 0)
+ third_pick = index;
- /* instantiate components for the picked mirror, start from 0 */
- if (layout->li_opc == LAYOUT_INTENT_TRUNC) {
/**
- * trunc transfers [size, eof) in the intent extent, we'd
- * stale components overlapping [size, eof), while we'd
- * instantiated components covers [0, size).
+ * we found a non-primary 1st pick, we'd like to find a
+ * potential pirmary mirror.
*/
- extent.e_end = extent.e_start;
- }
- extent.e_start = 0;
+ if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary)
+ continue;
- lod_foreach_mirror_comp(lod_comp, lo, picked) {
- if (!lu_extent_is_overlapped(&extent,
- &lod_comp->llc_extent))
+ /* check the availability of OSTs */
+ lod_foreach_mirror_comp(lod_comp, lo, index) {
+ if (!lod_comp_inited(lod_comp) || !lod_comp->llc_stripe)
+ continue;
+
+ for (j = 0; j < lod_comp->llc_stripe_count; j++) {
+ struct dt_object *dt = lod_comp->llc_stripe[j];
+
+ rc = lod_check_ost_avail(env, lo, dt, index);
+ if (rc < 0)
+ RETURN(rc);
+
+ ost_avail = !!rc;
+ if (!ost_avail)
+ break;
+ } /* for all dt object in one component */
+ if (!ost_avail)
+ break;
+ } /* for all components in a mirror */
+
+ /**
+ * the OSTs where allocated objects locates in the components
+ * of the mirror are available.
+ */
+ if (!ost_avail)
+ continue;
+
+ /* this mirror has all OSTs available */
+ picked = index;
+
+ /**
+ * primary with all OSTs are available, this is the perfect
+ * 1st pick.
+ */
+ if (lo->ldo_mirrors[index].lme_primary)
break;
+ } /* for all mirrors */
+
+ /* failed to pick a sound mirror, lower our expectation */
+ if (picked < 0)
+ picked = second_pick;
+ if (picked < 0)
+ picked = third_pick;
+ if (picked < 0)
+ RETURN(-ENODATA);
- if (lod_comp_inited(lod_comp))
+ RETURN(picked);
+}
+
+/**
+ * figure out the components should be instantiated for resync.
+ */
+static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo,
+ struct lu_extent *extent)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *lod_comp;
+ unsigned int need_sync = 0;
+ int i;
+
+ CDEBUG(D_LAYOUT,
+ DFID": instantiate all stale components in "DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(extent));
+
+ /**
+ * instantiate all components within this extent, even non-stale
+ * components.
+ */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (!lo->ldo_mirrors[i].lme_stale)
continue;
- CDEBUG(D_LAYOUT, "instantiate: %u / %u\n",
- i, lod_comp_index(lo, lod_comp));
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lu_extent_is_overlapped(extent,
+ &lod_comp->llc_extent))
+ break;
- info->lti_comp_idx[info->lti_count++] =
- lod_comp_index(lo, lod_comp);
+ need_sync++;
+
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n",
+ i, lod_comp_index(lo, lod_comp));
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
}
- lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+ return need_sync ? 0 : -EALREADY;
+}
+
+static int lod_declare_update_rdonly(const struct lu_env *env,
+ struct lod_object *lo, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_attr *layout_attr = &info->lti_layout_attr;
+ struct lod_layout_component *lod_comp;
+ struct lu_extent extent = { 0 };
+ int rc;
+ ENTRY;
+
+ LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
+ LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE ||
+ mlc->mlc_opc == MD_LAYOUT_RESYNC);
+ LASSERT(lo->ldo_mirror_count > 0);
+
+ if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+ struct layout_intent *layout = mlc->mlc_intent;
+ int picked;
+
+ extent = layout->li_extent;
+ CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(&extent));
+
+ picked = lod_primary_pick(env, lo, &extent);
+ if (picked < 0)
+ RETURN(picked);
+
+ CDEBUG(D_LAYOUT, DFID": picked mirror id %u as primary\n",
+ PFID(lod_object_fid(lo)),
+ lo->ldo_mirrors[picked].lme_id);
+
+ if (layout->li_opc == LAYOUT_INTENT_TRUNC) {
+ /**
+ * trunc transfers [0, size) in the intent extent, we'd
+ * stale components overlapping [size, eof).
+ */
+ extent.e_start = extent.e_end;
+ extent.e_end = OBD_OBJECT_EOF;
+ }
+
+ /* stale overlapping components from other mirrors */
+ lod_stale_components(lo, picked, &extent);
+
+ /* restore truncate intent extent */
+ if (layout->li_opc == LAYOUT_INTENT_TRUNC)
+ extent.e_end = extent.e_start;
+
+ /* instantiate components for the picked mirror, start from 0 */
+ extent.e_start = 0;
+
+ lod_foreach_mirror_comp(lod_comp, lo, picked) {
+ if (!lu_extent_is_overlapped(&extent,
+ &lod_comp->llc_extent))
+ break;
+
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+
+ lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+ } else { /* MD_LAYOUT_RESYNC */
+ int i;
+
+ /**
+ * could contain multiple non-stale mirrors, so we need to
+ * prep uninited all components assuming any non-stale mirror
+ * could be picked as the primary mirror.
+ */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_stale)
+ continue;
+
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lod_comp_inited(lod_comp))
+ break;
+
+ if (extent.e_end < lod_comp->llc_extent.e_end)
+ extent.e_end =
+ lod_comp->llc_extent.e_end;
+ }
+ }
+
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
+ /* change the file state to SYNC_PENDING */
+ lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
+ }
/* Reset the layout version once it's becoming too large.
* This way it can make sure that the layout version is
layout_attr->la_valid = LA_LAYOUT_VERSION;
layout_attr->la_layout_version = 0; /* set current version */
+ if (mlc->mlc_opc == MD_LAYOUT_RESYNC)
+ layout_attr->la_layout_version = LU_LAYOUT_RESYNC;
rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
if (rc)
GOTO(out, rc);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n",
PFID(lod_object_fid(lo)), PEXT(&extent));
+ if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) {
+ /**
+ * trunc transfers [0, size) in the intent extent, we'd
+ * stale components overlapping [size, eof).
+ */
+ extent.e_start = extent.e_end;
+ extent.e_end = OBD_OBJECT_EOF;
+ }
/* 1. stale overlapping components */
lod_stale_components(lo, primary, &extent);
/* 2. find out the components need instantiating.
* instantiate [0, mlc->mlc_intent->e_end) */
- if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) {
- /**
- * trunc transfers [size, eof) in the intent extent,
- * we'd stale components overlapping [size, eof),
- * while we'd instantiated components covers [0, size).
- */
+
+ /* restore truncate intent extent */
+ if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC)
extent.e_end = extent.e_start;
- }
extent.e_start = 0;
lod_foreach_mirror_comp(lod_comp, lo, primary) {
lod_comp_index(lo, lod_comp);
}
} else { /* MD_LAYOUT_RESYNC */
- /* figure out the components that have been instantiated in
- * in primary to decide what components should be instantiated
- * in stale mirrors */
lod_foreach_mirror_comp(lod_comp, lo, primary) {
if (!lod_comp_inited(lod_comp))
break;
extent.e_end = lod_comp->llc_extent.e_end;
}
- CDEBUG(D_LAYOUT,
- DFID": instantiate all stale components in "DEXT"\n",
- PFID(lod_object_fid(lo)), PEXT(&extent));
-
- /* 1. instantiate all components within this extent, even
- * non-stale components so that it won't need to instantiate
- * those components for mirror truncate later. */
- for (i = 0; i < lo->ldo_mirror_count; i++) {
- if (primary == i)
- continue;
-
- LASSERTF(lo->ldo_mirrors[i].lme_stale,
- "both %d and %d are primary\n", i, primary);
-
- lod_foreach_mirror_comp(lod_comp, lo, i) {
- if (!lu_extent_is_overlapped(&extent,
- &lod_comp->llc_extent))
- break;
-
- if (lod_comp_inited(lod_comp))
- continue;
-
- CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n",
- i, lod_comp_index(lo, lod_comp));
-
- info->lti_comp_idx[info->lti_count++] =
- lod_comp_index(lo, lod_comp);
- }
- }
-
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
/* change the file state to SYNC_PENDING */
lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
}
lod_obj_inc_layout_gen(lo);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
GOTO(out, rc = -EINVAL);
}
- if (!sync_components || !resync_components) {
- CDEBUG(D_LAYOUT, DFID": no mirror in sync or resync\n",
+ if (!sync_components || (mlc->mlc_resync_count && !resync_components)) {
+ CDEBUG(D_LAYOUT, DFID": no mirror in sync\n",
PFID(lod_object_fid(lo)));
/* tend to return an error code here to prevent
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
dt_object_remote(dt_object_child(dt)))
RETURN(-EINVAL);
- lod_write_lock(env, dt, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
GOTO(out, rc);
switch (lo->ldo_flr_state) {
- case LCM_FL_NOT_FLR:
+ case LCM_FL_NONE:
rc = lod_declare_update_plain(env, lo, mlc->mlc_intent,
&mlc->mlc_buf, th);
break;
break;
}
out:
- dt_write_unlock(env, dt);
RETURN(rc);
}
* \param[in] env execution environment
* \param[in] lo object
*/
-void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
+void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
{
struct lod_layout_component *lod_comp;
int i, j;
sizeof(struct dt_object *) *
lod_comp->llc_stripes_allocated);
lod_comp->llc_stripe = NULL;
+ OBD_FREE(lod_comp->llc_ost_indices,
+ sizeof(__u32) *
+ lod_comp->llc_stripes_allocated);
+ lod_comp->llc_ost_indices = NULL;
lod_comp->llc_stripes_allocated = 0;
}
lod_free_comp_entries(lo);
}
}
+void lod_striping_free(const struct lu_env *env, struct lod_object *lo)
+{
+ mutex_lock(&lo->ldo_layout_mutex);
+ lod_striping_free_nolock(env, lo);
+ mutex_unlock(&lo->ldo_layout_mutex);
+}
+
/**
* Implementation of lu_object_operations::loo_object_free.
*
struct lod_object *lo = lu2lod_obj(o);
/* release all underlying object pinned */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
lu_object_fini(o);
OBD_SLAB_FREE_PTR(lo, lod_object_kmem);
}