Whamcloud - gitweb
LU-4682 llite: a few fixes for migration.
[fs/lustre-release.git] / lustre / lov / lov_object.c
index c5eeb32..13ed111 100644 (file)
@@ -42,7 +42,6 @@
 #define DEBUG_SUBSYSTEM S_LOV
 
 #include "lov_cl_internal.h"
-#include <lustre_debug.h>
 
 /** \addtogroup lov
  *  @{
@@ -68,7 +67,7 @@ struct lov_layout_operations {
         int  (*llo_print)(const struct lu_env *env, void *cookie,
                           lu_printer_t p, const struct lu_object *o);
         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
-                               struct cl_page *page, struct page *vmpage);
+                             struct cl_page *page, pgoff_t index);
         int  (*llo_lock_init)(const struct lu_env *env,
                               struct cl_object *obj, struct cl_lock *lock,
                               const struct cl_io *io);
@@ -123,14 +122,14 @@ static struct cl_object *lov_sub_find(const struct lu_env *env,
 }
 
 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
-                        struct cl_object *stripe,
-                        struct lov_layout_raid0 *r0, int idx)
+                       struct cl_object *stripe, struct lov_layout_raid0 *r0,
+                       int idx)
 {
-        struct cl_object_header *hdr;
-        struct cl_object_header *subhdr;
-        struct cl_object_header *parent;
-        struct lov_oinfo        *oinfo;
-        int result;
+       struct cl_object_header *hdr;
+       struct cl_object_header *subhdr;
+       struct cl_object_header *parent;
+       struct lov_oinfo        *oinfo;
+       int result;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
                /* For sanity:test_206.
@@ -143,9 +142,8 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                return -EIO;
        }
 
-        hdr    = cl_object_header(lov2cl(lov));
-        subhdr = cl_object_header(stripe);
-        parent = subhdr->coh_parent;
+       hdr    = cl_object_header(lov2cl(lov));
+       subhdr = cl_object_header(stripe);
 
        oinfo = lov->lo_lsm->lsm_oinfo[idx];
        CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
@@ -154,19 +152,24 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
               PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
               oinfo->loi_ost_idx, oinfo->loi_ost_gen);
 
-        if (parent == NULL) {
-                subhdr->coh_parent = hdr;
-                subhdr->coh_nesting = hdr->coh_nesting + 1;
-                lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
-                r0->lo_sub[idx] = cl2lovsub(stripe);
-                r0->lo_sub[idx]->lso_super = lov;
-                r0->lo_sub[idx]->lso_index = idx;
-                result = 0;
-        } else {
+       /* reuse ->coh_attr_guard to protect coh_parent change */
+       spin_lock(&subhdr->coh_attr_guard);
+       parent = subhdr->coh_parent;
+       if (parent == NULL) {
+               subhdr->coh_parent = hdr;
+               spin_unlock(&subhdr->coh_attr_guard);
+               subhdr->coh_nesting = hdr->coh_nesting + 1;
+               lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
+               r0->lo_sub[idx] = cl2lovsub(stripe);
+               r0->lo_sub[idx]->lso_super = lov;
+               r0->lo_sub[idx]->lso_index = idx;
+               result = 0;
+       } else {
                struct lu_object  *old_obj;
                struct lov_object *old_lov;
                unsigned int mask = D_INODE;
 
+               spin_unlock(&subhdr->coh_attr_guard);
                old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
                LASSERT(old_obj != NULL);
                old_lov = cl2lov(lu2cl(old_obj));
@@ -185,8 +188,20 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n");
                LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
                cl_object_put(env, stripe);
-        }
-        return result;
+       }
+       return result;
+}
+
+static int lov_page_slice_fixup(struct lov_object *lov,
+                               struct cl_object *stripe)
+{
+       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
+       struct cl_object *o;
+
+       cl_object_for_each(o, stripe)
+               o->co_slice_off += hdr->coh_page_bufsize;
+
+       return cl_object_header(stripe)->coh_page_bufsize;
 }
 
 static int lov_init_raid0(const struct lu_env *env,
@@ -215,12 +230,16 @@ static int lov_init_raid0(const struct lu_env *env,
        LASSERT(lov->lo_lsm == NULL);
        lov->lo_lsm = lsm_addref(lsm);
        r0->lo_nr  = lsm->lsm_stripe_count;
-        LASSERT(r0->lo_nr <= lov_targets_nr(dev));
+       LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
-        OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
-        if (r0->lo_sub != NULL) {
-                result = 0;
-                subconf->coc_inode = conf->coc_inode;
+       lov->lo_layout_invalid = true;
+
+       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+       if (r0->lo_sub != NULL) {
+               int psz = 0;
+
+               result = 0;
+               subconf->coc_inode = conf->coc_inode;
                spin_lock_init(&r0->lo_sub_lock);
                 /*
                  * Create stripe cl_objects.
@@ -230,32 +249,41 @@ static int lov_init_raid0(const struct lu_env *env,
                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
                         int ost_idx = oinfo->loi_ost_idx;
 
-                        result = ostid_to_fid(ofid, &oinfo->loi_oi,
+                       result = ostid_to_fid(ofid, &oinfo->loi_oi,
                                              oinfo->loi_ost_idx);
                        if (result != 0)
                                GOTO(out, result);
 
-                        subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
-                        subconf->u.coc_oinfo = oinfo;
-                        LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
+                       subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
+                       subconf->u.coc_oinfo = oinfo;
+                       LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
                        /* In the function below, .hs_keycmp resolves to
                         * lu_obj_hop_keycmp() */
                        /* coverity[overrun-buffer-val] */
-                        stripe = lov_sub_find(env, subdev, ofid, subconf);
-                        if (!IS_ERR(stripe)) {
-                                result = lov_init_sub(env, lov, stripe, r0, i);
+                       stripe = lov_sub_find(env, subdev, ofid, subconf);
+                       if (!IS_ERR(stripe)) {
+                               result = lov_init_sub(env, lov, stripe, r0, i);
                                if (result == -EAGAIN) { /* try again */
                                        --i;
                                        result = 0;
+                                       continue;
                                }
-                        } else {
-                                result = PTR_ERR(stripe);
+                       } else {
+                               result = PTR_ERR(stripe);
+                       }
+
+                       if (result == 0) {
+                               int sz = lov_page_slice_fixup(lov, stripe);
+                               LASSERT(ergo(psz > 0, psz == sz));
+                               psz = sz;
                        }
                 }
-        } else
-                result = -ENOMEM;
+               if (result == 0)
+                       cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+       } else
+               result = -ENOMEM;
 out:
-        RETURN(result);
+       RETURN(result);
 }
 
 static int lov_init_released(const struct lu_env *env,
@@ -280,7 +308,7 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
 
        lov_layout_wait(env, lov);
 
-       cl_object_prune(env, &lov->lo_cl);
+       cl_locks_prune(env, &lov->lo_cl, 0);
        return 0;
 }
 
@@ -355,9 +383,9 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                                  */
                                 lov_subobject_kill(env, lov, los, i);
                        }
-                }
-        }
-       cl_object_prune(env, &lov->lo_cl);
+               }
+       }
+       cl_locks_prune(env, &lov->lo_cl, 0);
        RETURN(0);
 }
 
@@ -583,13 +611,13 @@ enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
 
 static inline void lov_conf_freeze(struct lov_object *lov)
 {
-       if (lov->lo_owner != cfs_current())
+       if (lov->lo_owner != current)
                down_read(&lov->lo_type_guard);
 }
 
 static inline void lov_conf_thaw(struct lov_object *lov)
 {
-       if (lov->lo_owner != cfs_current())
+       if (lov->lo_owner != current)
                up_read(&lov->lo_type_guard);
 }
 
@@ -627,10 +655,10 @@ do {                                                                    \
 
 static void lov_conf_lock(struct lov_object *lov)
 {
-       LASSERT(lov->lo_owner != cfs_current());
+       LASSERT(lov->lo_owner != current);
        down_write(&lov->lo_type_guard);
        LASSERT(lov->lo_owner == NULL);
-       lov->lo_owner = cfs_current();
+       lov->lo_owner = current;
 }
 
 static void lov_conf_unlock(struct lov_object *lov)
@@ -665,7 +693,6 @@ static int lov_layout_change(const struct lu_env *unused,
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
 
-       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
        void *cookie;
        struct lu_env *env;
        int refcheck;
@@ -691,13 +718,13 @@ static int lov_layout_change(const struct lu_env *unused,
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
 
+       cl_object_prune(env, &lov->lo_cl);
+
        result = old_ops->llo_delete(env, lov, &lov->u);
        if (result == 0) {
                old_ops->llo_fini(env, lov, &lov->u);
 
                LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
-               LASSERT(hdr->coh_tree.rnode == NULL);
-               LASSERT(hdr->coh_pages == 0);
 
                lov->lo_type = LLT_EMPTY;
                result = new_ops->llo_init(env,
@@ -829,10 +856,10 @@ static int lov_object_print(const struct lu_env *env, void *cookie,
 }
 
 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
-                 struct cl_page *page, struct page *vmpage)
+                 struct cl_page *page, pgoff_t index)
 {
-        return LOV_2DISPATCH_NOLOCK(cl2lov(obj),
-                                   llo_page_init, env, obj, page, vmpage);
+       return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
+                                   index);
 }
 
 /**
@@ -896,28 +923,28 @@ static const struct lu_object_operations lov_lu_obj_ops = {
 };
 
 struct lu_object *lov_object_alloc(const struct lu_env *env,
-                                   const struct lu_object_header *unused,
-                                   struct lu_device *dev)
+                                  const struct lu_object_header *unused,
+                                  struct lu_device *dev)
 {
-        struct lov_object *lov;
-        struct lu_object  *obj;
+       struct lov_object *lov;
+       struct lu_object  *obj;
 
-        ENTRY;
-       OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, __GFP_IO);
-        if (lov != NULL) {
-                obj = lov2lu(lov);
-                lu_object_init(obj, NULL, dev);
-                lov->lo_cl.co_ops = &lov_ops;
-                lov->lo_type = -1; /* invalid, to catch uninitialized type */
-                /*
-                 * object io operation vector (cl_object::co_iop) is installed
-                 * later in lov_object_init(), as different vectors are used
-                 * for object with different layouts.
-                 */
-                obj->lo_ops = &lov_lu_obj_ops;
-        } else
-                obj = NULL;
-        RETURN(obj);
+       ENTRY;
+       OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
+       if (lov != NULL) {
+               obj = lov2lu(lov);
+               lu_object_init(obj, NULL, dev);
+               lov->lo_cl.co_ops = &lov_ops;
+               lov->lo_type = -1; /* invalid, to catch uninitialized type */
+               /*
+                * object io operation vector (cl_object::co_iop) is installed
+                * later in lov_object_init(), as different vectors are used
+                * for object with different layouts.
+                */
+               obj->lo_ops = &lov_lu_obj_ops;
+       } else
+               obj = NULL;
+       RETURN(obj);
 }
 
 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
@@ -929,23 +956,12 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
                lsm = lsm_addref(lov->lo_lsm);
                CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
                        lsm, cfs_atomic_read(&lsm->lsm_refc),
-                       lov->lo_layout_invalid, cfs_current());
+                       lov->lo_layout_invalid, current);
        }
        lov_conf_thaw(lov);
        return lsm;
 }
 
-void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
-{
-       if (lsm == NULL)
-               return;
-
-       CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
-               lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
-
-       lov_free_memmd(&lsm);
-}
-
 struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
 {
        struct lu_object *luobj;