Whamcloud - gitweb
LU-169 lov: add lsm refcounting
[fs/lustre-release.git] / lustre / lov / lov_object.c
index 40ca89b..33a47ed 100644 (file)
  * Implementation of cl_object for LOV layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOV
 
 #include "lov_cl_internal.h"
+#include <lustre_debug.h>
 
 /** \addtogroup lov
  *  @{
@@ -179,8 +181,15 @@ static int lov_init_raid0(const struct lu_env *env,
         struct lov_layout_raid0 *r0      = &state->raid0;
 
         ENTRY;
-        r0->lo_nr  = conf->u.coc_md->lsm->lsm_stripe_count;
-        r0->lo_lsm = conf->u.coc_md->lsm;
+
+       if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
+               dump_lsm(D_ERROR, lsm);
+               LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
+                        LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
+       }
+
+       r0->lo_lsm = lsm_addref(lsm);
+       r0->lo_nr  = lsm->lsm_stripe_count;
         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
@@ -268,10 +277,17 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                              union lov_layout_state *state)
 {
-        struct lov_layout_raid0 *r0 = &state->raid0;
-        int                      i;
+       struct lov_layout_raid0 *r0 = &state->raid0;
+       struct lov_stripe_md    *lsm = r0->lo_lsm;
+       struct l_wait_info       lwi = { 0 };
+       int                      i;
+
+       ENTRY;
+
+       /* wait until there is no extra users. */
+       dump_lsm(D_INODE, lsm);
+       l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
 
-        ENTRY;
         if (r0->lo_sub != NULL) {
                 for (i = 0; i < r0->lo_nr; ++i) {
                         struct lovsub_object *los = r0->lo_sub[i];
@@ -299,11 +315,16 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
         struct lov_layout_raid0 *r0 = &state->raid0;
 
         ENTRY;
+
         if (r0->lo_sub != NULL) {
                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
                 r0->lo_sub = NULL;
         }
-        EXIT;
+
+       LASSERT(cfs_atomic_read(&r0->lo_lsm->lsm_refc) == 1);
+       lov_free_memmd(&r0->lo_lsm);
+
+       EXIT;
 }
 
 static int lov_print_empty(const struct lu_env *env, void *cookie,
@@ -499,6 +520,7 @@ static int lov_layout_change(const struct lu_env *env,
                 cl_env_put(nested, &refcheck);
                 cl_env_reexit(cookie);
 
+               old_ops->llo_delete(env, obj, &obj->u);
                 old_ops->llo_fini(env, obj, &obj->u);
                 LASSERT(cfs_list_empty(&hdr->coh_locks));
                 LASSERT(hdr->coh_tree.rnode == NULL);
@@ -529,6 +551,7 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 
         ENTRY;
         cfs_init_rwsem(&lov->lo_type_guard);
+       cfs_waitq_init(&lov->lo_waitq);
 
         /* no locking is necessary, as object is being created */
         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
@@ -544,8 +567,9 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                         const struct cl_object_conf *conf)
 {
-        struct lov_object *lov = cl2lov(obj);
-        int result;
+       struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+       struct lov_object *lov = cl2lov(obj);
+       int result = 0;
 
         ENTRY;
         /*
@@ -555,13 +579,21 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
         cfs_down_write(&lov->lo_type_guard);
         LASSERT(lov->lo_owner == NULL);
         lov->lo_owner = cfs_current();
-        if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
-                result = lov_layout_change(env, lov, LLT_RAID0, conf);
-        else
-                result = -EOPNOTSUPP;
-        lov->lo_owner = NULL;
-        cfs_up_write(&lov->lo_type_guard);
-        RETURN(result);
+       switch (lov->lo_type) {
+       case LLT_EMPTY:
+               if (lsm != NULL)
+                       result = lov_layout_change(env, lov, LLT_RAID0, conf);
+               break;
+       case LLT_RAID0:
+               if (lsm == NULL || lov_stripe_md_cmp(lov->u.raid0.lo_lsm, lsm))
+                       result = -EOPNOTSUPP;
+               break;
+       default:
+               LBUG();
+       }
+       lov->lo_owner = NULL;
+       cfs_up_write(&lov->lo_type_guard);
+       RETURN(result);
 }
 
 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
@@ -692,4 +724,99 @@ struct lu_object *lov_object_alloc(const struct lu_env *env,
         RETURN(obj);
 }
 
+struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
+{
+       struct lov_stripe_md *lsm = NULL;
+
+       cfs_down_read(&lov->lo_type_guard);
+       switch (lov->lo_type) {
+       case LLT_RAID0:
+               lsm = lsm_addref(lov->u.raid0.lo_lsm);
+       case LLT_EMPTY:
+               break;
+       default:
+               LBUG();
+       }
+       cfs_up_read(&lov->lo_type_guard);
+       return lsm;
+}
+
+void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
+{
+       if (lsm == NULL)
+               return;
+
+       lov_free_memmd(&lsm);
+       if (lov->lo_owner != NULL)
+               cfs_waitq_signal(&lov->lo_waitq);
+}
+
+struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
+{
+       struct lu_object *luobj;
+       struct lov_stripe_md *lsm = NULL;
+
+       if (clobj == NULL)
+               return NULL;
+
+       luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+                                &lov_device_type);
+       if (luobj != NULL)
+               lsm = lov_lsm_addref(lu2lov(luobj));
+       return lsm;
+}
+EXPORT_SYMBOL(lov_lsm_get);
+
+void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+{
+       struct lu_object *luobj;
+
+       if (clobj == NULL || lsm == NULL)
+               return;
+
+       luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+                                &lov_device_type);
+       LASSERT(luobj != NULL);
+
+       lov_lsm_decref(lu2lov(luobj), lsm);
+}
+EXPORT_SYMBOL(lov_lsm_put);
+
+int lov_read_and_clear_async_rc(struct cl_object *clob)
+{
+       struct lu_object *luobj;
+       int rc = 0;
+       ENTRY;
+
+       luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
+                                &lov_device_type);
+       if (luobj != NULL) {
+               struct lov_object *lov = lu2lov(luobj);
+
+               cfs_down_read(&lov->lo_type_guard);
+               switch (lov->lo_type) {
+               case LLT_RAID0: {
+                       struct lov_stripe_md *lsm;
+                       int i;
+
+                       lsm = lov->u.raid0.lo_lsm;
+                       LASSERT(lsm != NULL);
+                       for (i = 0; i < lsm->lsm_stripe_count; i++) {
+                               struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+                               if (loi->loi_ar.ar_rc && !rc)
+                                       rc = loi->loi_ar.ar_rc;
+                               loi->loi_ar.ar_rc = 0;
+                       }
+               }
+               case LLT_EMPTY:
+                       break;
+               default:
+                       LBUG();
+               }
+               cfs_up_read(&lov->lo_type_guard);
+       }
+       RETURN(rc);
+}
+EXPORT_SYMBOL(lov_read_and_clear_async_rc);
+
 /** @} lov */