Whamcloud - gitweb
LU-2744 build: fix 'data race condition' issues
[fs/lustre-release.git] / lustre / lov / lov_object.c
index eecfbcb..8e1cd43 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -68,7 +68,7 @@ struct lov_layout_operations {
         int  (*llo_print)(const struct lu_env *env, void *cookie,
                           lu_printer_t p, const struct lu_object *o);
         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
-                               struct cl_page *page, cfs_page_t *vmpage);
+                               struct cl_page *page, struct page *vmpage);
         int  (*llo_lock_init)(const struct lu_env *env,
                               struct cl_object *obj, struct cl_lock *lock,
                               const struct cl_io *io);
@@ -123,14 +123,14 @@ static struct cl_object *lov_sub_find(const struct lu_env *env,
 }
 
 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
-                        struct cl_object *stripe,
-                        struct lov_layout_raid0 *r0, int idx)
+                       struct cl_object *stripe, struct lov_layout_raid0 *r0,
+                       int idx)
 {
-        struct cl_object_header *hdr;
-        struct cl_object_header *subhdr;
-        struct cl_object_header *parent;
-        struct lov_oinfo        *oinfo;
-        int result;
+       struct cl_object_header *hdr;
+       struct cl_object_header *subhdr;
+       struct cl_object_header *parent;
+       struct lov_oinfo        *oinfo;
+       int result;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
                /* For sanity:test_206.
@@ -143,31 +143,34 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                return -EIO;
        }
 
-        hdr    = cl_object_header(lov2cl(lov));
-        subhdr = cl_object_header(stripe);
-        parent = subhdr->coh_parent;
+       hdr    = cl_object_header(lov2cl(lov));
+       subhdr = cl_object_header(stripe);
 
        oinfo = lov->lo_lsm->lsm_oinfo[idx];
-        CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64
-               " idx: %d gen: %d\n",
-               PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
-               PFID(&hdr->coh_lu.loh_fid), hdr,
-               oinfo->loi_id, oinfo->loi_seq,
-               oinfo->loi_ost_idx, oinfo->loi_ost_gen);
-
-        if (parent == NULL) {
-                subhdr->coh_parent = hdr;
-                subhdr->coh_nesting = hdr->coh_nesting + 1;
-                lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
-                r0->lo_sub[idx] = cl2lovsub(stripe);
-                r0->lo_sub[idx]->lso_super = lov;
-                r0->lo_sub[idx]->lso_index = idx;
-                result = 0;
-        } else {
+       CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
+              " idx: %d gen: %d\n",
+              PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
+              PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
+              oinfo->loi_ost_idx, oinfo->loi_ost_gen);
+
+       /* reuse ->coh_attr_guard to protect coh_parent change */
+       spin_lock(&subhdr->coh_attr_guard);
+       parent = subhdr->coh_parent;
+       if (parent == NULL) {
+               subhdr->coh_parent = hdr;
+               spin_unlock(&subhdr->coh_attr_guard);
+               subhdr->coh_nesting = hdr->coh_nesting + 1;
+               lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
+               r0->lo_sub[idx] = cl2lovsub(stripe);
+               r0->lo_sub[idx]->lso_super = lov;
+               r0->lo_sub[idx]->lso_index = idx;
+               result = 0;
+       } else {
                struct lu_object  *old_obj;
                struct lov_object *old_lov;
                unsigned int mask = D_INODE;
 
+               spin_unlock(&subhdr->coh_attr_guard);
                old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
                LASSERT(old_obj != NULL);
                old_lov = cl2lov(lu2cl(old_obj));
@@ -186,8 +189,8 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n");
                LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
                cl_object_put(env, stripe);
-        }
-        return result;
+       }
+       return result;
 }
 
 static int lov_init_raid0(const struct lu_env *env,
@@ -231,8 +234,11 @@ static int lov_init_raid0(const struct lu_env *env,
                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
                         int ost_idx = oinfo->loi_ost_idx;
 
-                        fid_ostid_unpack(ofid, &oinfo->loi_oi,
-                                         oinfo->loi_ost_idx);
+                        result = ostid_to_fid(ofid, &oinfo->loi_oi,
+                                             oinfo->loi_ost_idx);
+                       if (result != 0)
+                               GOTO(out, result);
+
                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
                         subconf->u.coc_oinfo = oinfo;
                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
@@ -252,28 +258,44 @@ static int lov_init_raid0(const struct lu_env *env,
                 }
         } else
                 result = -ENOMEM;
+out:
         RETURN(result);
 }
 
+static int lov_init_released(const struct lu_env *env,
+                       struct lov_device *dev, struct lov_object *lov,
+                       const struct cl_object_conf *conf,
+                       union  lov_layout_state *state)
+{
+       struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+
+       LASSERT(lsm != NULL);
+       LASSERT(lsm_is_released(lsm));
+       LASSERT(lov->lo_lsm == NULL);
+
+       lov->lo_lsm = lsm_addref(lsm);
+       return 0;
+}
+
 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state)
 {
-       LASSERT(lov->lo_type == LLT_EMPTY);
+       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
 
        lov_layout_wait(env, lov);
 
-       cl_object_prune(env, &lov->lo_cl);
+       cl_locks_prune(env, &lov->lo_cl, 0);
        return 0;
 }
 
 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
-                               struct lovsub_object *los, int idx)
+                              struct lovsub_object *los, int idx)
 {
-        struct cl_object        *sub;
-        struct lov_layout_raid0 *r0;
-        struct lu_site          *site;
-        struct lu_site_bkt_data *bkt;
-        cfs_waitlink_t          *waiter;
+       struct cl_object        *sub;
+       struct lov_layout_raid0 *r0;
+       struct lu_site          *site;
+       struct lu_site_bkt_data *bkt;
+       wait_queue_t          *waiter;
 
         r0  = &lov->u.raid0;
         LASSERT(r0->lo_sub[idx] == los);
@@ -289,28 +311,28 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 
         /* ... wait until it is actually destroyed---sub-object clears its
          * ->lo_sub[] slot in lovsub_object_fini() */
-        if (r0->lo_sub[idx] == los) {
-                waiter = &lov_env_info(env)->lti_waiter;
-                cfs_waitlink_init(waiter);
-                cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
-                cfs_set_current_state(CFS_TASK_UNINT);
-                while (1) {
-                        /* this wait-queue is signaled at the end of
-                         * lu_object_free(). */
-                        cfs_set_current_state(CFS_TASK_UNINT);
+       if (r0->lo_sub[idx] == los) {
+               waiter = &lov_env_info(env)->lti_waiter;
+               init_waitqueue_entry_current(waiter);
+               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               set_current_state(TASK_UNINTERRUPTIBLE);
+               while (1) {
+                       /* this wait-queue is signaled at the end of
+                        * lu_object_free(). */
+                       set_current_state(TASK_UNINTERRUPTIBLE);
                        spin_lock(&r0->lo_sub_lock);
                        if (r0->lo_sub[idx] == los) {
                                spin_unlock(&r0->lo_sub_lock);
-                               cfs_waitq_wait(waiter, CFS_TASK_UNINT);
+                               waitq_wait(waiter, TASK_UNINTERRUPTIBLE);
                        } else {
                                spin_unlock(&r0->lo_sub_lock);
-                                cfs_set_current_state(CFS_TASK_RUNNING);
-                                break;
-                        }
-                }
-                cfs_waitq_del(&bkt->lsb_marche_funebre, waiter);
-        }
-        LASSERT(r0->lo_sub[idx] == NULL);
+                               set_current_state(TASK_RUNNING);
+                               break;
+                       }
+               }
+               remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
+       }
+       LASSERT(r0->lo_sub[idx] == NULL);
 }
 
 static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
@@ -337,16 +359,16 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                                  */
                                 lov_subobject_kill(env, lov, los, i);
                        }
-                }
-        }
-       cl_object_prune(env, &lov->lo_cl);
+               }
+       }
+       cl_locks_prune(env, &lov->lo_cl, 0);
        RETURN(0);
 }
 
 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state)
 {
-        LASSERT(lov->lo_type == LLT_EMPTY);
+       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
 }
 
 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
@@ -366,6 +388,15 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
        EXIT;
 }
 
+static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
+                               union lov_layout_state *state)
+{
+       ENTRY;
+       dump_lsm(D_INODE, lov->lo_lsm);
+       lov_free_memmd(&lov->lo_lsm);
+       EXIT;
+}
+
 static int lov_print_empty(const struct lu_env *env, void *cookie,
                            lu_printer_t p, const struct lu_object *o)
 {
@@ -374,27 +405,42 @@ static int lov_print_empty(const struct lu_env *env, void *cookie,
 }
 
 static int lov_print_raid0(const struct lu_env *env, void *cookie,
-                           lu_printer_t p, const struct lu_object *o)
+                          lu_printer_t p, const struct lu_object *o)
 {
-        struct lov_object       *lov = lu2lov(o);
-        struct lov_layout_raid0 *r0  = lov_r0(lov);
-       struct lov_stripe_md    *lsm = lov->lo_lsm;
-        int i;
+       struct lov_object       *lov = lu2lov(o);
+       struct lov_layout_raid0 *r0  = lov_r0(lov);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
+       int                      i;
 
-        (*p)(env, cookie, "stripes: %d, %svalid, lsm{%p 0x%08X %d %u %u}: \n",
-               r0->lo_nr, lov->lo_layout_invalid ? "in" : "", lsm,
+       (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
+               r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
                lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
                lsm->lsm_stripe_count, lsm->lsm_layout_gen);
-        for (i = 0; i < r0->lo_nr; ++i) {
-                struct lu_object *sub;
-
-                if (r0->lo_sub[i] != NULL) {
-                        sub = lovsub2lu(r0->lo_sub[i]);
-                        lu_object_print(env, cookie, p, sub);
-                } else
-                        (*p)(env, cookie, "sub %d absent\n", i);
-        }
-        return 0;
+       for (i = 0; i < r0->lo_nr; ++i) {
+               struct lu_object *sub;
+
+               if (r0->lo_sub[i] != NULL) {
+                       sub = lovsub2lu(r0->lo_sub[i]);
+                       lu_object_print(env, cookie, p, sub);
+               } else {
+                       (*p)(env, cookie, "sub %d absent\n", i);
+               }
+       }
+       return 0;
+}
+
+static int lov_print_released(const struct lu_env *env, void *cookie,
+                               lu_printer_t p, const struct lu_object *o)
+{
+       struct lov_object       *lov = lu2lov(o);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
+
+       (*p)(env, cookie,
+               "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
+               lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+               lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+               lsm->lsm_stripe_count, lsm->lsm_layout_gen);
+       return 0;
 }
 
 /**
@@ -500,10 +546,20 @@ const static struct lov_layout_operations lov_dispatch[] = {
                 .llo_lock_init = lov_lock_init_raid0,
                 .llo_io_init   = lov_io_init_raid0,
                 .llo_getattr   = lov_attr_get_raid0
+       },
+        [LLT_RELEASED] = {
+                .llo_init      = lov_init_released,
+                .llo_delete    = lov_delete_empty,
+                .llo_fini      = lov_fini_released,
+                .llo_install   = lov_install_empty,
+                .llo_print     = lov_print_released,
+                .llo_page_init = lov_page_init_empty,
+                .llo_lock_init = lov_lock_init_empty,
+                .llo_io_init   = lov_io_init_released,
+                .llo_getattr   = lov_attr_get_empty
         }
 };
 
-
 /**
  * Performs a double-dispatch based on the layout type of an object.
  */
@@ -517,15 +573,27 @@ const static struct lov_layout_operations lov_dispatch[] = {
         lov_dispatch[__llt].op(__VA_ARGS__);                            \
 })
 
+/**
+ * Return lov_layout_type associated with a given lsm
+ */
+enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
+{
+       if (lsm == NULL)
+               return LLT_EMPTY;
+       if (lsm_is_released(lsm))
+               return LLT_RELEASED;
+       return LLT_RAID0;
+}
+
 static inline void lov_conf_freeze(struct lov_object *lov)
 {
-       if (lov->lo_owner != cfs_current())
+       if (lov->lo_owner != current)
                down_read(&lov->lo_type_guard);
 }
 
 static inline void lov_conf_thaw(struct lov_object *lov)
 {
-       if (lov->lo_owner != cfs_current())
+       if (lov->lo_owner != current)
                up_read(&lov->lo_type_guard);
 }
 
@@ -563,10 +631,10 @@ do {                                                                    \
 
 static void lov_conf_lock(struct lov_object *lov)
 {
-       LASSERT(lov->lo_owner != cfs_current());
+       LASSERT(lov->lo_owner != current);
        down_write(&lov->lo_type_guard);
        LASSERT(lov->lo_owner == NULL);
-       lov->lo_owner = cfs_current();
+       lov->lo_owner = current;
 }
 
 static void lov_conf_unlock(struct lov_object *lov)
@@ -592,8 +660,8 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
 }
 
 static int lov_layout_change(const struct lu_env *unused,
-                             struct lov_object *lov,
-                             const struct cl_object_conf *conf)
+                            struct lov_object *lov,
+                            const struct cl_object_conf *conf)
 {
        int result;
        enum lov_layout_type llt = LLT_EMPTY;
@@ -601,7 +669,6 @@ static int lov_layout_change(const struct lu_env *unused,
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
 
-       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
        void *cookie;
        struct lu_env *env;
        int refcheck;
@@ -609,8 +676,8 @@ static int lov_layout_change(const struct lu_env *unused,
 
        LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
 
-       if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL)
-               llt = LLT_RAID0; /* only raid0 is supported. */
+       if (conf->u.coc_md != NULL)
+               llt = lov_type(conf->u.coc_md->lsm);
        LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
 
        cookie = cl_env_reenter();
@@ -620,16 +687,20 @@ static int lov_layout_change(const struct lu_env *unused,
                RETURN(PTR_ERR(env));
        }
 
+       CDEBUG(D_INODE, DFID" from %s to %s\n",
+              PFID(lu_object_fid(lov2lu(lov))),
+              llt2str(lov->lo_type), llt2str(llt));
+
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
 
+       cl_object_prune(env, &lov->lo_cl);
+
        result = old_ops->llo_delete(env, lov, &lov->u);
        if (result == 0) {
                old_ops->llo_fini(env, lov, &lov->u);
 
                LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
-               LASSERT(hdr->coh_tree.rnode == NULL);
-               LASSERT(hdr->coh_pages == 0);
 
                lov->lo_type = LLT_EMPTY;
                result = new_ops->llo_init(env,
@@ -655,7 +726,6 @@ static int lov_layout_change(const struct lu_env *unused,
  * Lov object operations.
  *
  */
-
 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
                     const struct lu_object_conf *conf)
 {
@@ -669,12 +739,12 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
         ENTRY;
        init_rwsem(&lov->lo_type_guard);
        cfs_atomic_set(&lov->lo_active_ios, 0);
-       cfs_waitq_init(&lov->lo_waitq);
+       init_waitqueue_head(&lov->lo_waitq);
 
        cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
 
         /* no locking is necessary, as object is being created */
-        lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
+       lov->lo_type = lov_type(cconf->u.coc_md->lsm);
         ops = &lov_dispatch[lov->lo_type];
         result = ops->llo_init(env, dev, lov, cconf, set);
         if (result == 0)
@@ -685,9 +755,9 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                         const struct cl_object_conf *conf)
 {
-       struct lov_stripe_md *lsm = NULL;
-       struct lov_object *lov = cl2lov(obj);
-       int result = 0;
+       struct lov_stripe_md    *lsm = NULL;
+       struct lov_object       *lov = cl2lov(obj);
+       int                      result = 0;
        ENTRY;
 
        lov_conf_lock(lov);
@@ -711,8 +781,9 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        if (conf->u.coc_md != NULL)
                lsm = conf->u.coc_md->lsm;
        if ((lsm == NULL && lov->lo_lsm == NULL) ||
-           (lsm != NULL && lov->lo_lsm != NULL &&
-            lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
+           ((lsm != NULL && lov->lo_lsm != NULL) &&
+            (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
+            (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
                /* same version of layout */
                lov->lo_layout_invalid = false;
                GOTO(out, result = 0);
@@ -729,6 +800,8 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
 
 out:
        lov_conf_unlock(lov);
+       CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
+              PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
        RETURN(result);
 }
 
@@ -759,7 +832,7 @@ static int lov_object_print(const struct lu_env *env, void *cookie,
 }
 
 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
-               struct cl_page *page, cfs_page_t *vmpage)
+                 struct cl_page *page, struct page *vmpage)
 {
         return LOV_2DISPATCH_NOLOCK(cl2lov(obj),
                                    llo_page_init, env, obj, page, vmpage);
@@ -833,7 +906,7 @@ struct lu_object *lov_object_alloc(const struct lu_env *env,
         struct lu_object  *obj;
 
         ENTRY;
-        OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO);
+       OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, __GFP_IO);
         if (lov != NULL) {
                 obj = lov2lu(lov);
                 lu_object_init(obj, NULL, dev);
@@ -859,7 +932,7 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
                lsm = lsm_addref(lov->lo_lsm);
                CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
                        lsm, cfs_atomic_read(&lsm->lsm_refc),
-                       lov->lo_layout_invalid, cfs_current());
+                       lov->lo_layout_invalid, current);
        }
        lov_conf_thaw(lov);
        return lsm;
@@ -871,7 +944,7 @@ void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
                return;
 
        CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
-               lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
+               lsm, cfs_atomic_read(&lsm->lsm_refc), current);
 
        lov_free_memmd(&lsm);
 }
@@ -925,6 +998,7 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
                                loi->loi_ar.ar_rc = 0;
                        }
                }
+               case LLT_RELEASED:
                case LLT_EMPTY:
                        break;
                default: