LASSERT(buf);
LASSERT(buf->lb_buf);
LASSERT(buf->lb_len);
+ LASSERT(mutex_is_locked(&lo->ldo_layout_mutex));
lmm = (struct lov_mds_md_v1 *)buf->lb_buf;
magic = le32_to_cpu(lmm->lmm_magic);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free_nolock(env, lo);
RETURN(rc);
}
return true;
if (S_ISDIR(lod2lu_obj(lo)->lo_header->loh_attr)) {
- if (lo->ldo_stripe != NULL || lo->ldo_dir_stripe_loaded)
+ if (lo->ldo_dir_stripe_loaded)
return true;
/* Never load LMV stripe for slaves of striped dir */
}
/**
- * Initialize the object representing the stripes.
+ * A generic function to initialize the stripe objects.
*
- * Unless the stripes are initialized already, fetch LOV (for regular
- * objects) or LMV (for directory objects) EA and call lod_parse_striping()
- * to instantiate the objects representing the stripes. Caller should
- * hold the dt_write_lock(next).
+ * A protected version of lod_striping_load_locked() - load the striping
+ * information from storage, parse that and instantiate LU objects to
+ * represent the stripes. The LOD object \a lo supplies a pointer to the
+ * next sub-object in the LU stack so we can lock it. Also use \a lo to
+ * return an array of references to the newly instantiated objects.
*
* \param[in] env execution environment for this thread
- * \param[in,out] lo LOD object
+ * \param[in,out] lo LOD object, where striping is stored and
+ * which gets an array of references
*
* \retval 0 if parsing and object creation succeed
* \retval negative error number on failure
- */
-int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo)
+ **/
+int lod_striping_load(const struct lu_env *env, struct lod_object *lo)
{
- struct lod_thread_info *info = lod_env_info(env);
- struct lu_buf *buf = &info->lti_buf;
- struct dt_object *next = dt_object_child(&lo->ldo_obj);
- int rc = 0;
+ struct lod_thread_info *info = lod_env_info(env);
+ struct dt_object *next = dt_object_child(&lo->ldo_obj);
+ struct lu_buf *buf = &info->lti_buf;
+ int rc = 0;
+
ENTRY;
if (!dt_object_exists(next))
- GOTO(out, rc = 0);
+ RETURN(0);
if (lod_striping_loaded(lo))
- GOTO(out, rc = 0);
+ RETURN(0);
+
+ mutex_lock(&lo->ldo_layout_mutex);
+ if (lod_striping_loaded(lo))
+ GOTO(unlock, rc = 0);
if (S_ISREG(lod2lu_obj(lo)->lo_header->loh_attr)) {
rc = lod_get_lov_ea(env, lo);
if (rc <= 0)
- GOTO(out, rc);
+ GOTO(unlock, rc);
+
/*
* there is LOV EA (striping information) in this object
* let's parse it and create in-core objects for the stripes
*/
if (rc == 0)
lo->ldo_dir_stripe_loaded = 1;
- GOTO(out, rc = rc > 0 ? -EINVAL : rc);
+ GOTO(unlock, rc = rc > 0 ? -EINVAL : rc);
}
buf->lb_buf = info->lti_ea_store;
buf->lb_len = info->lti_ea_store_size;
}
if (rc < 0)
- GOTO(out, rc);
+ GOTO(unlock, rc);
}
/*
if (rc == 0)
lo->ldo_dir_stripe_loaded = 1;
}
-out:
- RETURN(rc);
+ EXIT;
+unlock:
+ mutex_unlock(&lo->ldo_layout_mutex);
+
+ return rc;
}
-/**
- * A generic function to initialize the stripe objects.
- *
- * A protected version of lod_load_striping_locked() - load the striping
- * information from storage, parse that and instantiate LU objects to
- * represent the stripes. The LOD object \a lo supplies a pointer to the
- * next sub-object in the LU stack so we can lock it. Also use \a lo to
- * return an array of references to the newly instantiated objects.
- *
- * \param[in] env execution environment for this thread
- * \param[in,out] lo LOD object, where striping is stored and
- * which gets an array of references
- *
- * \retval 0 if parsing and object creation succeed
- * \retval negative error number on failure
- **/
-int lod_load_striping(const struct lu_env *env, struct lod_object *lo)
+int lod_striping_reload(const struct lu_env *env, struct lod_object *lo,
+ const struct lu_buf *buf)
{
- struct dt_object *next = dt_object_child(&lo->ldo_obj);
- int rc;
+ int rc;
- if (!dt_object_exists(next))
- return 0;
+ ENTRY;
- /* Check without locking first */
- if (lod_striping_loaded(lo))
- return 0;
+ mutex_lock(&lo->ldo_layout_mutex);
+ lod_striping_free_nolock(env, lo);
+ rc = lod_parse_striping(env, lo, buf);
+ mutex_unlock(&lo->ldo_layout_mutex);
- /* currently this code is supposed to be called from declaration
- * phase only, thus the object is not expected to be locked by caller */
- dt_write_lock(env, next, 0);
- rc = lod_load_striping_locked(env, lo);
- dt_write_unlock(env, next);
- return rc;
+ RETURN(rc);
}
/**
LASSERT(next->do_ops);
LASSERT(next->do_ops->do_index_try);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
* the in-memory striping information has been freed in lod_xattr_set()
* due to layout change. It has to load stripe here again. It only
* changes flags of layout so declare_attr_set() is still accurate */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
int rc = 0;
ENTRY;
+ LASSERT(mutex_is_locked(&lo->ldo_layout_mutex));
+
if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
RETURN(0);
lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
if (rc != 0)
- lod_object_free_striping(env, lo);
+ lod_striping_free_nolock(env, lo);
RETURN(rc);
}
stripe[i] = dto;
}
- lo->ldo_dir_stripe_loaded = 1;
lo->ldo_dir_striped = 1;
lo->ldo_stripe = stripe;
lo->ldo_dir_stripe_count = i;
lo->ldo_dir_stripes_allocated = stripe_count;
+ smp_mb();
+ lo->ldo_dir_stripe_loaded = 1;
if (lo->ldo_dir_stripe_count == 0)
GOTO(out_put, rc = -ENOSPC);
if (rc != 0) {
/* failed to create striping, let's reset
* config so that others don't get confused */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
GOTO(out, rc);
}
out:
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
if (flags & LCME_FL_INIT) {
if (changed)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(-EINVAL);
}
{
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_object *lo = lod_dt_obj(dt);
- struct dt_object *next = dt_object_child(&lo->ldo_obj);
char *op;
int rc, len = strlen(XATTR_LUSTRE_LOV);
ENTRY;
}
len++;
- dt_write_lock(env, next, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(unlock, rc);
}
unlock:
if (rc)
- lod_object_free_striping(env, lo);
- dt_write_unlock(env, next);
+ lod_striping_free(env, lo);
RETURN(rc);
}
if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE)
lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY);
- LASSERT(dt_write_locked(env, dt_object_child(dt)));
- lod_object_free_striping(env, lo);
- rc = lod_parse_striping(env, lo, buf);
+ rc = lod_striping_reload(env, lo, buf);
if (rc)
GOTO(out, rc);
lod_obj_inc_layout_gen(lo);
lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen);
- lod_object_free_striping(env, lo);
- rc = lod_parse_striping(env, lo, mbuf);
+ rc = lod_striping_reload(env, lo, mbuf);
if (rc)
RETURN(rc);
LASSERT(lo);
if (lo->ldo_comp_cnt == 0) {
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
RETURN(rc);
}
EXIT;
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
return rc;
}
* defines striping, then create() does the work */
if (fl & LU_XATTR_REPLACE) {
/* free stripes, then update disk */
- lod_object_free_striping(env, lod_dt_obj(dt));
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
} else if (dt_object_remote(dt)) {
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
ENTRY;
if (!strcmp(name, XATTR_NAME_LOV))
- lod_object_free_striping(env, lod_dt_obj(dt));
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_del(env, next, name, th);
if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
RETURN(0);
out:
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
RETURN(-ENOTDIR);
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
GOTO(out, rc = -EINVAL);
}
- lod_object_free_striping(env, lo);
rc = lod_use_defined_striping(env, lo, buf);
if (rc)
GOTO(out, rc);
replay = true;
} else {
/* non replay path */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
}
rc = lod_declare_instantiate_components(env, lo, th);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
lod_obj_inc_layout_gen(lo);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
dt_object_remote(dt_object_child(dt)))
RETURN(-EINVAL);
- lod_write_lock(env, dt, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
break;
}
out:
- dt_write_unlock(env, dt);
RETURN(rc);
}
* \param[in] env execution environment
* \param[in] lo object
*/
-void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
+void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
{
struct lod_layout_component *lod_comp;
int i, j;
}
}
+void lod_striping_free(const struct lu_env *env, struct lod_object *lo)
+{
+ mutex_lock(&lo->ldo_layout_mutex);
+ lod_striping_free_nolock(env, lo);
+ mutex_unlock(&lo->ldo_layout_mutex);
+}
+
/**
* Implementation of lu_object_operations::loo_object_free.
*
struct lod_object *lo = lu2lod_obj(o);
/* release all underlying object pinned */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
lu_object_fini(o);
OBD_SLAB_FREE_PTR(lo, lod_object_kmem);
}