Whamcloud - gitweb
b=22040 use df POSIX output format
[fs/lustre-release.git] / lustre / lov / lov_object.c
index a38d22b..658b34d 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  */
 
-/** \addtogroup lov lov @{ */
-
 #define DEBUG_SUBSYSTEM S_LOV
 
 #include "lov_cl_internal.h"
 
+/** \addtogroup lov
+ *  @{
+ */
+
 /*****************************************************************************
  *
  * Layout operations.
@@ -106,28 +108,6 @@ static void lov_install_raid0(const struct lu_env *env,
         lov->u = *state;
 }
 
-static void oinfo_get_fid(const struct lov_oinfo *oinfo, struct lu_fid *fid)
-{
-        __u64 idx = oinfo->loi_id;
-
-        /* See idif definition in wiki:CMD3_interoperability_architecture */
-
-        LASSERT(oinfo->loi_gr < 1ULL << 16);
-        LASSERT(oinfo->loi_id < 1ULL << 49);
-        ENTRY;
-
-        /*
-         * Now that the fid of stripe is not unique now, ost_idx have to
-         * be used to make it unique. This is ok because the stripe fids
-         * are just used in client side(to locate the objects). -jay
-         */
-        fid->f_seq = ((__u64)oinfo->loi_ost_idx) << 32 |
-                     oinfo->loi_gr << 16 | idx >> 32;
-        fid->f_oid = idx; /* truncated to 32 bits by assignment */
-        fid->f_ver = 0;
-        EXIT;
-}
-
 static struct cl_object *lov_sub_find(const struct lu_env *env,
                                       struct cl_device *dev,
                                       const struct lu_fid *fid,
@@ -156,11 +136,11 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
         parent = subhdr->coh_parent;
 
         oinfo = r0->lo_lsm->lsm_oinfo[idx];
-        CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" gr: "LPU64
+        CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64
                " idx: %d gen: %d\n",
                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
                PFID(&hdr->coh_lu.loh_fid), hdr,
-               oinfo->loi_id, oinfo->loi_gr,
+               oinfo->loi_id, oinfo->loi_seq,
                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
 
         if (parent == NULL) {
@@ -207,6 +187,7 @@ static int lov_init_raid0(const struct lu_env *env,
         if (r0->lo_sub != NULL) {
                 result = 0;
                 subconf->coc_inode = conf->coc_inode;
+                cfs_spin_lock_init(&r0->lo_sub_lock);
                 /*
                  * Create stripe cl_objects.
                  */
@@ -215,9 +196,11 @@ static int lov_init_raid0(const struct lu_env *env,
                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
                         int ost_idx = oinfo->loi_ost_idx;
 
-                        oinfo_get_fid(oinfo, ofid);
+                        fid_ostid_unpack(ofid, &oinfo->loi_oi,
+                                         oinfo->loi_ost_idx);
                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
                         subconf->u.coc_oinfo = oinfo;
+                        LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
                         stripe = lov_sub_find(env, subdev, ofid, subconf);
                         if (!IS_ERR(stripe))
                                 result = lov_init_sub(env, lov, stripe, r0, i);
@@ -235,6 +218,51 @@ static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
         LASSERT(lov->lo_type == LLT_EMPTY);
 }
 
+static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
+                               struct lovsub_object *los, int idx)
+{
+        struct cl_object        *sub;
+        struct lov_layout_raid0 *r0;
+        struct lu_site          *site;
+        cfs_waitlink_t          *waiter;
+
+        r0  = &lov->u.raid0;
+        LASSERT(r0->lo_sub[idx] == los);
+
+        sub  = lovsub2cl(los);
+        site = sub->co_lu.lo_dev->ld_site;
+
+        cl_object_kill(env, sub);
+        /* release a reference to the sub-object and ... */
+        lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
+        cl_object_put(env, sub);
+
+        /* ... wait until it is actually destroyed---sub-object clears its
+         * ->lo_sub[] slot in lovsub_object_fini() */
+        if (r0->lo_sub[idx] == los) {
+                waiter = &lov_env_info(env)->lti_waiter;
+                cfs_waitlink_init(waiter);
+                cfs_waitq_add(&site->ls_marche_funebre, waiter);
+                cfs_set_current_state(CFS_TASK_UNINT);
+                while (1) {
+                        /* this wait-queue is signaled at the end of
+                         * lu_object_free(). */
+                        cfs_set_current_state(CFS_TASK_UNINT);
+                        cfs_spin_lock(&r0->lo_sub_lock);
+                        if (r0->lo_sub[idx] == los) {
+                                cfs_spin_unlock(&r0->lo_sub_lock);
+                                cfs_waitq_wait(waiter, CFS_TASK_UNINT);
+                        } else {
+                                cfs_spin_unlock(&r0->lo_sub_lock);
+                                cfs_set_current_state(CFS_TASK_RUNNING);
+                                break;
+                        }
+                }
+                cfs_waitq_del(&site->ls_marche_funebre, waiter);
+        }
+        LASSERT(r0->lo_sub[idx] == NULL);
+}
+
 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                              union lov_layout_state *state)
 {
@@ -242,17 +270,16 @@ static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
         int                      i;
 
         ENTRY;
-        if (r0->lo_sub != NULL &&
-            lu_object_is_dying(lov->lo_cl.co_lu.lo_header)) {
+        if (r0->lo_sub != NULL) {
                 for (i = 0; i < r0->lo_nr; ++i) {
-                        struct lovsub_object *sub = r0->lo_sub[i];
+                        struct lovsub_object *los = r0->lo_sub[i];
 
-                        if (sub != NULL)
+                        if (los != NULL)
                                 /*
                                  * If top-level object is to be evicted from
                                  * the cache, so are its sub-objects.
                                  */
-                                cl_object_kill(env, lovsub2cl(sub));
+                                lov_subobject_kill(env, lov, los, i);
                 }
         }
         EXIT;
@@ -271,19 +298,6 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
 
         ENTRY;
         if (r0->lo_sub != NULL) {
-                int i;
-
-                for (i = 0; i < r0->lo_nr; ++i) {
-                        struct cl_object *sub;
-
-                        if (r0->lo_sub[i] == NULL)
-                                continue;
-                        sub = lovsub2cl(r0->lo_sub[i]);
-                        cl_object_header(sub)->coh_parent = NULL;
-                        lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
-                        cl_object_put(env, sub);
-                        r0->lo_sub[i] = NULL;
-                }
                 OBD_FREE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
                 r0->lo_sub = NULL;
         }
@@ -423,10 +437,10 @@ const static struct lov_layout_operations lov_dispatch[] = {
                                                                         \
         __lock &= __obj->lo_owner != cfs_current();                     \
         if (__lock)                                                     \
-                down_read(&__obj->lo_type_guard);                       \
+                cfs_down_read(&__obj->lo_type_guard);                   \
         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
         if (__lock)                                                     \
-                up_read(&__obj->lo_type_guard);                         \
+                cfs_up_read(&__obj->lo_type_guard);                     \
         __result;                                                       \
 })
 
@@ -442,12 +456,12 @@ do {                                                                    \
         enum lov_layout_type                    __llt;                  \
                                                                         \
         if (__obj->lo_owner != cfs_current())                           \
-                down_read(&__obj->lo_type_guard);                       \
+                cfs_down_read(&__obj->lo_type_guard);                   \
         __llt = __obj->lo_type;                                         \
         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
         lov_dispatch[__llt].op(__VA_ARGS__);                            \
         if (__obj->lo_owner != cfs_current())                           \
-                up_read(&__obj->lo_type_guard);                         \
+                cfs_up_read(&__obj->lo_type_guard);                     \
 } while (0)
 
 static int lov_layout_change(const struct lu_env *env,
@@ -484,7 +498,7 @@ static int lov_layout_change(const struct lu_env *env,
                 cl_env_reexit(cookie);
 
                 old_ops->llo_fini(env, obj, &obj->u);
-                LASSERT(list_empty(&hdr->coh_locks));
+                LASSERT(cfs_list_empty(&hdr->coh_locks));
                 LASSERT(hdr->coh_tree.rnode == NULL);
                 LASSERT(hdr->coh_pages == 0);
 
@@ -512,7 +526,7 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
         int result;
 
         ENTRY;
-        init_rwsem(&lov->lo_type_guard);
+        cfs_init_rwsem(&lov->lo_type_guard);
 
         /* no locking is necessary, as object is being created */
         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
@@ -536,7 +550,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
          * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
          */
         LASSERT(lov->lo_owner != cfs_current());
-        down_write(&lov->lo_type_guard);
+        cfs_down_write(&lov->lo_type_guard);
         LASSERT(lov->lo_owner == NULL);
         lov->lo_owner = cfs_current();
         if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
@@ -544,7 +558,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
         else
                 result = -EOPNOTSUPP;
         lov->lo_owner = NULL;
-        up_write(&lov->lo_type_guard);
+        cfs_up_write(&lov->lo_type_guard);
         RETURN(result);
 }
 
@@ -652,14 +666,14 @@ static const struct lu_object_operations lov_lu_obj_ops = {
 };
 
 struct lu_object *lov_object_alloc(const struct lu_env *env,
-                                   const struct lu_object_header *_,
+                                   const struct lu_object_header *unused,
                                    struct lu_device *dev)
 {
         struct lov_object *lov;
         struct lu_object  *obj;
 
         ENTRY;
-        OBD_SLAB_ALLOC_PTR(lov, lov_object_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO);
         if (lov != NULL) {
                 obj = lov2lu(lov);
                 lu_object_init(obj, NULL, dev);