* Internal interfaces of LOV layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
+ * Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#ifndef LOV_CL_INTERNAL_H
* \see lov_object::lo_type
*/
struct rw_semaphore lo_type_guard;
- /**
- * Type of an object. Protected by lov_object::lo_type_guard.
- */
- enum lov_layout_type lo_type;
/**
- * True if layout is valid. This bit is cleared when layout lock
+ * Type of an object. Protected by lov_object::lo_type_guard.
+ */
+ enum lov_layout_type lo_type;
+ /**
+ * True if layout is invalid. This bit is cleared when layout lock
* is lost.
*/
- unsigned lo_lsm_invalid:1;
+ bool lo_layout_invalid;
/**
- * Layout metadata.
+ * How many IOs are on going on this object. Layout can be changed
+ * only if there is no active IO.
*/
- struct lov_stripe_md *lo_lsm;
+ cfs_atomic_t lo_active_ios;
/**
* Waitq - wait for no one else is using lo_lsm
*/
cfs_waitq_t lo_waitq;
+ /**
+ * Layout metadata. NULL if empty layout.
+ */
+ struct lov_stripe_md *lo_lsm;
union lov_layout_state {
struct lov_layout_raid0 {
* lov_io::lis_cl::cis_object.
*/
struct lov_object *lis_object;
- /**
- * Lov stripe - this determines how this io fans out.
- * Hold a refcount to the lsm so it can't go away during IO.
- */
- struct lov_stripe_md *lis_lsm;
/**
* Original end-of-io position for this IO, set by the upper layer as
* cl_io::u::ci_rw::pos + cl_io::u::ci_rw::count. lov remembers this,
static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
int stripe, loff_t start, loff_t end)
{
- struct lov_stripe_md *lsm = lio->lis_lsm;
- struct cl_io *parent = lio->lis_cl.cis_io;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ struct cl_io *parent = lio->lis_cl.cis_io;
switch(io->ci_type) {
case CIT_SETATTR: {
struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
const struct cl_page_slice *slice)
{
- struct lov_stripe_md *lsm = lio->lis_lsm;
- struct cl_page *page = slice->cpl_page;
- int stripe;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ struct cl_page *page = slice->cpl_page;
+ int stripe;
LASSERT(lio->lis_cl.cis_io != NULL);
LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
struct cl_io *io)
{
- struct lov_stripe_md *lsm = lio->lis_lsm;
- int result;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ int result;
LASSERT(lio->lis_object != NULL);
ENTRY;
lio->lis_object = obj;
LASSERT(obj->lo_lsm != NULL);
- lio->lis_lsm = lsm_addref(obj->lo_lsm);
- lio->lis_stripe_count = lio->lis_lsm->lsm_stripe_count;
+ lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
switch (io->ci_type) {
case CIT_READ:
static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
{
- struct lov_io *lio = cl2lov_io(env, ios);
- int i;
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct lov_object *lov = cl2lov(ios->cis_obj);
+ int i;
ENTRY;
if (lio->lis_subs != NULL) {
lio->lis_nr_subios * sizeof lio->lis_subs[0]);
lio->lis_nr_subios = 0;
}
- lov_lsm_decref(lio->lis_object, lio->lis_lsm);
- lio->lis_lsm = NULL;
+
+ LASSERT(cfs_atomic_read(&lov->lo_active_ios) > 0);
+ if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
+ cfs_waitq_broadcast(&lov->lo_waitq);
EXIT;
}
const struct cl_io_slice *ios)
{
struct lov_io *lio = cl2lov_io(env, ios);
- struct lov_stripe_md *lsm = lio->lis_lsm;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
struct lov_io_sub *sub;
obd_off endpos;
obd_off start;
{
struct lov_io *lio = cl2lov_io(env, ios);
struct cl_io *io = ios->cis_io;
- struct lov_stripe_md *lsm = lio->lis_lsm;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
loff_t start = io->u.ci_rw.crw_pos;
loff_t next;
unsigned long ssize = lsm->lsm_stripe_size;
static void lov_empty_io_fini(const struct lu_env *env,
const struct cl_io_slice *ios)
{
+ struct lov_object *lov = cl2lov(ios->cis_obj);
ENTRY;
+
+ if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
+ cfs_waitq_broadcast(&lov->lo_waitq);
EXIT;
}
static const struct cl_io_operations lov_empty_io_ops = {
.op = {
[CIT_READ] = {
-#if 0
.cio_fini = lov_empty_io_fini,
+#if 0
.cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
.cio_lock = LOV_EMPTY_IMPOSSIBLE,
.cio_start = LOV_EMPTY_IMPOSSIBLE,
lov_io_slice_init(lio, lov, io);
if (io->ci_result == 0) {
io->ci_result = lov_io_subio_init(env, lio, io);
- if (io->ci_result == 0)
+ if (io->ci_result == 0) {
cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
+ cfs_atomic_inc(&lov->lo_active_ios);
+ }
}
RETURN(io->ci_result);
}
int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io)
{
+ struct lov_object *lov = cl2lov(obj);
struct lov_io *lio = lov_env_io(env);
int result;
ENTRY;
- lio->lis_lsm = NULL;
+ lio->lis_object = lov;
switch (io->ci_type) {
default:
LBUG();
PFID(lu_object_fid(&obj->co_lu)));
break;
}
- if (result == 0)
+ if (result == 0) {
cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
+ cfs_atomic_inc(&lov->lo_active_ios);
+ }
+
io->ci_result = result < 0 ? result : 0;
RETURN(result != 0);
}
union lov_layout_state *state)
{
LASSERT(lov->lo_type == LLT_EMPTY);
+ if (cfs_atomic_read(&lov->lo_active_ios) > 0)
+ RETURN(-EBUSY);
+
cl_object_prune(env, &lov->lo_cl);
return 0;
}
ENTRY;
dump_lsm(D_INODE, lsm);
- if (lov->lo_lsm_invalid && cfs_atomic_read(&lsm->lsm_refc) > 1)
+ if (cfs_atomic_read(&lov->lo_active_ios) > 0)
RETURN(-EBUSY);
if (r0->lo_sub != NULL) {
}
}
}
+ cl_object_prune(env, &lov->lo_cl);
RETURN(0);
}
static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
{
struct l_wait_info lwi = { 0 };
- struct lov_stripe_md *lsm = lov->lo_lsm;
ENTRY;
- if (!lov->lo_lsm_invalid || lsm == NULL)
+ if (!lov->lo_layout_invalid)
RETURN(0);
- LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
- while (cfs_atomic_read(&lsm->lsm_refc) > 1 && lov->lo_lsm_invalid) {
+ while (cfs_atomic_read(&lov->lo_active_ios) > 0) {
lov_conf_unlock(lov);
CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
PFID(lu_object_fid(lov2lu(lov))),
- cfs_atomic_read(&lsm->lsm_refc));
+ cfs_atomic_read(&lov->lo_active_ios));
l_wait_event(lov->lo_waitq,
- cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+ cfs_atomic_read(&lov->lo_active_ios) == 0, &lwi);
lov_conf_lock(lov);
}
RETURN(0);
result = old_ops->llo_delete(env, lov, &lov->u);
if (result == 0) {
old_ops->llo_fini(env, lov, &lov->u);
+
+ LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
LASSERT(cfs_list_empty(&hdr->coh_locks));
LASSERT(hdr->coh_tree.rnode == NULL);
LASSERT(hdr->coh_pages == 0);
ENTRY;
init_rwsem(&lov->lo_type_guard);
+ cfs_atomic_set(&lov->lo_active_ios, 0);
cfs_waitq_init(&lov->lo_waitq);
cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
lov_conf_lock(lov);
if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
- lov->lo_lsm_invalid = 1;
+ lov->lo_layout_invalid = true;
GOTO(out, result = 0);
}
(lsm != NULL && lov->lo_lsm != NULL &&
lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
/* same version of layout */
- lov->lo_lsm_invalid = 0;
+ lov->lo_layout_invalid = false;
GOTO(out, result = 0);
}
/* will change layout - check if there still exists active IO. */
- if (lov->lo_lsm != NULL &&
- cfs_atomic_read(&lov->lo_lsm->lsm_refc) > 1) {
- lov->lo_lsm_invalid = 1;
+ if (cfs_atomic_read(&lov->lo_active_ios) > 1) {
+ lov->lo_layout_invalid = true;
GOTO(out, result = -EBUSY);
}
default:
LBUG();
}
- lov->lo_lsm_invalid = result != 0;
+ lov->lo_layout_invalid = result != 0;
EXIT;
out:
lsm = lsm_addref(lov->lo_lsm);
CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
lsm, cfs_atomic_read(&lsm->lsm_refc),
- lov->lo_lsm_invalid, cfs_current());
+ lov->lo_layout_invalid, cfs_current());
}
lov_conf_thaw(lov);
return lsm;
CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
- if (lov_free_memmd(&lsm) <= 1 && lov->lo_lsm_invalid)
- cfs_waitq_signal(&lov->lo_waitq);
+ lov_free_memmd(&lsm);
}
struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
}
EXPORT_SYMBOL(lov_lsm_get);
-void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
{
- struct lu_object *luobj;
-
- if (clobj == NULL || lsm == NULL)
- return;
-
- luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
- &lov_device_type);
- LASSERT(luobj != NULL);
-
- lov_lsm_decref(lu2lov(luobj), lsm);
+ if (lsm != NULL)
+ lov_free_memmd(&lsm);
}
EXPORT_SYMBOL(lov_lsm_put);