Whamcloud - gitweb
LU-1876 hsm: account for active IOs correctly
authorJinshan Xiong <jinshan.xiong@intel.com>
Wed, 23 Jan 2013 07:32:24 +0000 (23:32 -0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 29 Jan 2013 23:34:47 +0000 (18:34 -0500)
We used to use refcount of lsm to account for active IOs but this
turns out wrong because empty layout files don't have a lsm.

In this patch, lov_object::lo_active_ios is invented to account for
active IOs for both raid0 and empty layout;

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I1b7aa16367af1d0a54f6c70bb4155b20071af225
Reviewed-on: http://review.whamcloud.com/5157
Tested-by: Hudson
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/lov/lov_cl_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_object.c

index 6d4360c..d0bb9ae 100644 (file)
@@ -40,6 +40,7 @@
  * Internal interfaces of LOV layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #ifndef LOV_CL_INTERNAL_H
@@ -197,23 +198,28 @@ struct lov_object {
          * \see lov_object::lo_type
          */
        struct rw_semaphore     lo_type_guard;
-        /**
-         * Type of an object. Protected by lov_object::lo_type_guard.
-         */
-        enum lov_layout_type   lo_type;
        /**
-        * True if layout is valid. This bit is cleared when layout lock
+        * Type of an object. Protected by lov_object::lo_type_guard.
+        */
+       enum lov_layout_type    lo_type;
+       /**
+        * True if layout is invalid. This bit is cleared when layout lock
         * is lost.
         */
-       unsigned               lo_lsm_invalid:1;
+       bool                    lo_layout_invalid;
        /**
-        * Layout metadata.
+        * How many IOs are on going on this object. Layout can be changed
+        * only if there is no active IO.
         */
-       struct lov_stripe_md  *lo_lsm;
+       cfs_atomic_t           lo_active_ios;
        /**
         * Waitq - wait for no one else is using lo_lsm
         */
        cfs_waitq_t            lo_waitq;
+       /**
+        * Layout metadata. NULL if empty layout.
+        */
+       struct lov_stripe_md  *lo_lsm;
 
         union lov_layout_state {
                 struct lov_layout_raid0 {
@@ -482,11 +488,6 @@ struct lov_io {
          * lov_io::lis_cl::cis_object.
          */
         struct lov_object *lis_object;
-       /**
-        * Lov stripe - this determines how this io fans out.
-        * Hold a refcount to the lsm so it can't go away during IO.
-        */
-       struct lov_stripe_md *lis_lsm;
         /**
          * Original end-of-io position for this IO, set by the upper layer as
          * cl_io::u::ci_rw::pos + cl_io::u::ci_rw::count. lov remembers this,
index 440f357..39c254f 100644 (file)
@@ -85,8 +85,8 @@ static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
                                int stripe, loff_t start, loff_t end)
 {
-       struct lov_stripe_md *lsm    = lio->lis_lsm;
-        struct cl_io         *parent = lio->lis_cl.cis_io;
+       struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
+       struct cl_io         *parent = lio->lis_cl.cis_io;
 
         switch(io->ci_type) {
         case CIT_SETATTR: {
@@ -260,9 +260,9 @@ static int lov_page_stripe(const struct cl_page *page)
 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
                                   const struct cl_page_slice *slice)
 {
-       struct lov_stripe_md *lsm  = lio->lis_lsm;
-        struct cl_page       *page = slice->cpl_page;
-        int stripe;
+       struct lov_stripe_md *lsm  = lio->lis_object->lo_lsm;
+       struct cl_page       *page = slice->cpl_page;
+       int stripe;
 
         LASSERT(lio->lis_cl.cis_io != NULL);
         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
@@ -278,8 +278,8 @@ struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
                              struct cl_io *io)
 {
-       struct lov_stripe_md *lsm = lio->lis_lsm;
-        int result;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+       int result;
 
         LASSERT(lio->lis_object != NULL);
         ENTRY;
@@ -309,8 +309,7 @@ static void lov_io_slice_init(struct lov_io *lio,
        lio->lis_object = obj;
 
        LASSERT(obj->lo_lsm != NULL);
-       lio->lis_lsm = lsm_addref(obj->lo_lsm);
-        lio->lis_stripe_count = lio->lis_lsm->lsm_stripe_count;
+       lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
 
         switch (io->ci_type) {
         case CIT_READ:
@@ -360,8 +359,9 @@ static void lov_io_slice_init(struct lov_io *lio,
 
 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
 {
-        struct lov_io *lio = cl2lov_io(env, ios);
-        int i;
+       struct lov_io *lio = cl2lov_io(env, ios);
+       struct lov_object *lov = cl2lov(ios->cis_obj);
+       int i;
 
         ENTRY;
         if (lio->lis_subs != NULL) {
@@ -371,8 +371,10 @@ static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
                 lio->lis_nr_subios = 0;
         }
-       lov_lsm_decref(lio->lis_object, lio->lis_lsm);
-       lio->lis_lsm = NULL;
+
+       LASSERT(cfs_atomic_read(&lov->lo_active_ios) > 0);
+       if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
+               cfs_waitq_broadcast(&lov->lo_waitq);
        EXIT;
 }
 
@@ -387,7 +389,7 @@ static int lov_io_iter_init(const struct lu_env *env,
                             const struct cl_io_slice *ios)
 {
        struct lov_io        *lio = cl2lov_io(env, ios);
-       struct lov_stripe_md *lsm = lio->lis_lsm;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
         struct lov_io_sub    *sub;
         obd_off endpos;
         obd_off start;
@@ -427,7 +429,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
 {
        struct lov_io        *lio = cl2lov_io(env, ios);
        struct cl_io         *io  = ios->cis_io;
-       struct lov_stripe_md *lsm = lio->lis_lsm;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
         loff_t start = io->u.ci_rw.crw_pos;
         loff_t next;
         unsigned long ssize = lsm->lsm_stripe_size;
@@ -840,7 +842,11 @@ static const struct cl_io_operations lov_io_ops = {
 static void lov_empty_io_fini(const struct lu_env *env,
                               const struct cl_io_slice *ios)
 {
+       struct lov_object *lov = cl2lov(ios->cis_obj);
         ENTRY;
+
+       if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
+               cfs_waitq_broadcast(&lov->lo_waitq);
         EXIT;
 }
 
@@ -858,8 +864,8 @@ static void lov_empty_impossible(const struct lu_env *env,
 static const struct cl_io_operations lov_empty_io_ops = {
         .op = {
                 [CIT_READ] = {
-#if 0
                         .cio_fini       = lov_empty_io_fini,
+#if 0
                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
@@ -916,8 +922,10 @@ int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
         lov_io_slice_init(lio, lov, io);
         if (io->ci_result == 0) {
                 io->ci_result = lov_io_subio_init(env, lio, io);
-                if (io->ci_result == 0)
+                if (io->ci_result == 0) {
                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
+                       cfs_atomic_inc(&lov->lo_active_ios);
+               }
         }
         RETURN(io->ci_result);
 }
@@ -925,11 +933,12 @@ int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
                       struct cl_io *io)
 {
+       struct lov_object *lov = cl2lov(obj);
        struct lov_io *lio = lov_env_io(env);
        int result;
        ENTRY;
 
-       lio->lis_lsm = NULL;
+       lio->lis_object = lov;
        switch (io->ci_type) {
        default:
                LBUG();
@@ -950,8 +959,11 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
                        PFID(lu_object_fid(&obj->co_lu)));
                 break;
         }
-        if (result == 0)
+        if (result == 0) {
                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
+               cfs_atomic_inc(&lov->lo_active_ios);
+       }
+
        io->ci_result = result < 0 ? result : 0;
        RETURN(result != 0);
 }
index 4b1d3af..c1cca70 100644 (file)
@@ -237,6 +237,9 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state)
 {
        LASSERT(lov->lo_type == LLT_EMPTY);
+       if (cfs_atomic_read(&lov->lo_active_ios) > 0)
+               RETURN(-EBUSY);
+
        cl_object_prune(env, &lov->lo_cl);
        return 0;
 }
@@ -298,7 +301,7 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
        ENTRY;
 
        dump_lsm(D_INODE, lsm);
-       if (lov->lo_lsm_invalid && cfs_atomic_read(&lsm->lsm_refc) > 1)
+       if (cfs_atomic_read(&lov->lo_active_ios) > 0)
                RETURN(-EBUSY);
 
         if (r0->lo_sub != NULL) {
@@ -315,6 +318,7 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                        }
                 }
         }
+       cl_object_prune(env, &lov->lo_cl);
        RETURN(0);
 }
 
@@ -538,22 +542,20 @@ static void lov_conf_unlock(struct lov_object *lov)
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
 {
        struct l_wait_info lwi = { 0 };
-       struct lov_stripe_md *lsm = lov->lo_lsm;
        ENTRY;
 
-       if (!lov->lo_lsm_invalid || lsm == NULL)
+       if (!lov->lo_layout_invalid)
                RETURN(0);
 
-       LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
-       while (cfs_atomic_read(&lsm->lsm_refc) > 1 && lov->lo_lsm_invalid) {
+       while (cfs_atomic_read(&lov->lo_active_ios) > 0) {
                lov_conf_unlock(lov);
 
                CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
                        PFID(lu_object_fid(lov2lu(lov))),
-                       cfs_atomic_read(&lsm->lsm_refc));
+                       cfs_atomic_read(&lov->lo_active_ios));
 
                l_wait_event(lov->lo_waitq,
-                            cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+                            cfs_atomic_read(&lov->lo_active_ios) == 0, &lwi);
                lov_conf_lock(lov);
        }
        RETURN(0);
@@ -590,6 +592,8 @@ static int lov_layout_change(const struct lu_env *unused,
        result = old_ops->llo_delete(env, lov, &lov->u);
        if (result == 0) {
                old_ops->llo_fini(env, lov, &lov->u);
+
+               LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
                LASSERT(cfs_list_empty(&hdr->coh_locks));
                LASSERT(hdr->coh_tree.rnode == NULL);
                LASSERT(hdr->coh_pages == 0);
@@ -631,6 +635,7 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 
         ENTRY;
        init_rwsem(&lov->lo_type_guard);
+       cfs_atomic_set(&lov->lo_active_ios, 0);
        cfs_waitq_init(&lov->lo_waitq);
 
        cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
@@ -654,7 +659,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
 
        lov_conf_lock(lov);
        if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
-               lov->lo_lsm_invalid = 1;
+               lov->lo_layout_invalid = true;
                GOTO(out, result = 0);
        }
 
@@ -671,14 +676,13 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
            (lsm != NULL && lov->lo_lsm != NULL &&
             lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
                /* same version of layout */
-               lov->lo_lsm_invalid = 0;
+               lov->lo_layout_invalid = false;
                GOTO(out, result = 0);
        }
 
        /* will change layout - check if there still exists active IO. */
-       if (lov->lo_lsm != NULL &&
-           cfs_atomic_read(&lov->lo_lsm->lsm_refc) > 1) {
-               lov->lo_lsm_invalid = 1;
+       if (cfs_atomic_read(&lov->lo_active_ios) > 1) {
+               lov->lo_layout_invalid = true;
                GOTO(out, result = -EBUSY);
        }
 
@@ -699,7 +703,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        default:
                LBUG();
        }
-       lov->lo_lsm_invalid = result != 0;
+       lov->lo_layout_invalid = result != 0;
        EXIT;
 
 out:
@@ -834,7 +838,7 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
                lsm = lsm_addref(lov->lo_lsm);
                CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
                        lsm, cfs_atomic_read(&lsm->lsm_refc),
-                       lov->lo_lsm_invalid, cfs_current());
+                       lov->lo_layout_invalid, cfs_current());
        }
        lov_conf_thaw(lov);
        return lsm;
@@ -848,8 +852,7 @@ void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
        CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
                lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
 
-       if (lov_free_memmd(&lsm) <= 1 && lov->lo_lsm_invalid)
-               cfs_waitq_signal(&lov->lo_waitq);
+       lov_free_memmd(&lsm);
 }
 
 struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
@@ -868,18 +871,10 @@ struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
 }
 EXPORT_SYMBOL(lov_lsm_get);
 
-void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
 {
-       struct lu_object *luobj;
-
-       if (clobj == NULL || lsm == NULL)
-               return;
-
-       luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
-                                &lov_device_type);
-       LASSERT(luobj != NULL);
-
-       lov_lsm_decref(lu2lov(luobj), lsm);
+       if (lsm != NULL)
+               lov_free_memmd(&lsm);
 }
 EXPORT_SYMBOL(lov_lsm_put);