* Implementation of cl_object for LOV layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
+ * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
*/
#define DEBUG_SUBSYSTEM S_LOV
#include "lov_cl_internal.h"
+#include <lustre_debug.h>
/** \addtogroup lov
* @{
struct lov_layout_raid0 *r0 = &state->raid0;
ENTRY;
- r0->lo_nr = conf->u.coc_md->lsm->lsm_stripe_count;
- r0->lo_lsm = conf->u.coc_md->lsm;
+
+ if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
+ dump_lsm(D_ERROR, lsm);
+ LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
+ LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
+ }
+
+ r0->lo_lsm = lsm_addref(lsm);
+ r0->lo_nr = lsm->lsm_stripe_count;
LASSERT(r0->lo_nr <= lov_targets_nr(dev));
OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
union lov_layout_state *state)
{
- struct lov_layout_raid0 *r0 = &state->raid0;
- int i;
+ struct lov_layout_raid0 *r0 = &state->raid0;
+ struct lov_stripe_md *lsm = r0->lo_lsm;
+ struct l_wait_info lwi = { 0 };
+ int i;
+
+ ENTRY;
+
+ /* wait until there is no extra users. */
+ dump_lsm(D_INODE, lsm);
+ l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
- ENTRY;
if (r0->lo_sub != NULL) {
for (i = 0; i < r0->lo_nr; ++i) {
struct lovsub_object *los = r0->lo_sub[i];
struct lov_layout_raid0 *r0 = &state->raid0;
ENTRY;
+
if (r0->lo_sub != NULL) {
OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
r0->lo_sub = NULL;
}
- EXIT;
+
+ LASSERT(cfs_atomic_read(&r0->lo_lsm->lsm_refc) == 1);
+ lov_free_memmd(&r0->lo_lsm);
+
+ EXIT;
}
static int lov_print_empty(const struct lu_env *env, void *cookie,
cl_env_put(nested, &refcheck);
cl_env_reexit(cookie);
+ old_ops->llo_delete(env, obj, &obj->u);
old_ops->llo_fini(env, obj, &obj->u);
LASSERT(cfs_list_empty(&hdr->coh_locks));
LASSERT(hdr->coh_tree.rnode == NULL);
ENTRY;
cfs_init_rwsem(&lov->lo_type_guard);
+ cfs_waitq_init(&lov->lo_waitq);
/* no locking is necessary, as object is being created */
lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
const struct cl_object_conf *conf)
{
- struct lov_object *lov = cl2lov(obj);
- int result;
+ struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+ struct lov_object *lov = cl2lov(obj);
+ int result = 0;
ENTRY;
/*
cfs_down_write(&lov->lo_type_guard);
LASSERT(lov->lo_owner == NULL);
lov->lo_owner = cfs_current();
- if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
- result = lov_layout_change(env, lov, LLT_RAID0, conf);
- else
- result = -EOPNOTSUPP;
- lov->lo_owner = NULL;
- cfs_up_write(&lov->lo_type_guard);
- RETURN(result);
+ switch (lov->lo_type) {
+ case LLT_EMPTY:
+ if (lsm != NULL)
+ result = lov_layout_change(env, lov, LLT_RAID0, conf);
+ break;
+ case LLT_RAID0:
+ if (lsm == NULL || lov_stripe_md_cmp(lov->u.raid0.lo_lsm, lsm))
+ result = -EOPNOTSUPP;
+ break;
+ default:
+ LBUG();
+ }
+ lov->lo_owner = NULL;
+ cfs_up_write(&lov->lo_type_guard);
+ RETURN(result);
}
static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
RETURN(obj);
}
+struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
+{
+ struct lov_stripe_md *lsm = NULL;
+
+ cfs_down_read(&lov->lo_type_guard);
+ switch (lov->lo_type) {
+ case LLT_RAID0:
+ lsm = lsm_addref(lov->u.raid0.lo_lsm);
+ case LLT_EMPTY:
+ break;
+ default:
+ LBUG();
+ }
+ cfs_up_read(&lov->lo_type_guard);
+ return lsm;
+}
+
+void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
+{
+ if (lsm == NULL)
+ return;
+
+ lov_free_memmd(&lsm);
+ if (lov->lo_owner != NULL)
+ cfs_waitq_signal(&lov->lo_waitq);
+}
+
+struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
+{
+ struct lu_object *luobj;
+ struct lov_stripe_md *lsm = NULL;
+
+ if (clobj == NULL)
+ return NULL;
+
+ luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+ &lov_device_type);
+ if (luobj != NULL)
+ lsm = lov_lsm_addref(lu2lov(luobj));
+ return lsm;
+}
+EXPORT_SYMBOL(lov_lsm_get);
+
+void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+{
+ struct lu_object *luobj;
+
+ if (clobj == NULL || lsm == NULL)
+ return;
+
+ luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+ &lov_device_type);
+ LASSERT(luobj != NULL);
+
+ lov_lsm_decref(lu2lov(luobj), lsm);
+}
+EXPORT_SYMBOL(lov_lsm_put);
+
+int lov_read_and_clear_async_rc(struct cl_object *clob)
+{
+ struct lu_object *luobj;
+ int rc = 0;
+ ENTRY;
+
+ luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
+ &lov_device_type);
+ if (luobj != NULL) {
+ struct lov_object *lov = lu2lov(luobj);
+
+ cfs_down_read(&lov->lo_type_guard);
+ switch (lov->lo_type) {
+ case LLT_RAID0: {
+ struct lov_stripe_md *lsm;
+ int i;
+
+ lsm = lov->u.raid0.lo_lsm;
+ LASSERT(lsm != NULL);
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+ if (loi->loi_ar.ar_rc && !rc)
+ rc = loi->loi_ar.ar_rc;
+ loi->loi_ar.ar_rc = 0;
+ }
+ }
+ case LLT_EMPTY:
+ break;
+ default:
+ LBUG();
+ }
+ cfs_up_read(&lov->lo_type_guard);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(lov_read_and_clear_async_rc);
+
/** @} lov */