Whamcloud - gitweb
LU-3321 clio: remove stackable cl_page completely
[fs/lustre-release.git] / lustre / lov / lov_object.c
index 1d10ef5..a33848b 100644 (file)
@@ -68,7 +68,7 @@ struct lov_layout_operations {
         int  (*llo_print)(const struct lu_env *env, void *cookie,
                           lu_printer_t p, const struct lu_object *o);
         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
-                               struct cl_page *page, struct page *vmpage);
+                             struct cl_page *page, pgoff_t index);
         int  (*llo_lock_init)(const struct lu_env *env,
                               struct cl_object *obj, struct cl_lock *lock,
                               const struct cl_io *io);
@@ -123,14 +123,14 @@ static struct cl_object *lov_sub_find(const struct lu_env *env,
 }
 
 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
-                        struct cl_object *stripe,
-                        struct lov_layout_raid0 *r0, int idx)
+                       struct cl_object *stripe, struct lov_layout_raid0 *r0,
+                       int idx)
 {
-        struct cl_object_header *hdr;
-        struct cl_object_header *subhdr;
-        struct cl_object_header *parent;
-        struct lov_oinfo        *oinfo;
-        int result;
+       struct cl_object_header *hdr;
+       struct cl_object_header *subhdr;
+       struct cl_object_header *parent;
+       struct lov_oinfo        *oinfo;
+       int result;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
                /* For sanity:test_206.
@@ -143,9 +143,8 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                return -EIO;
        }
 
-        hdr    = cl_object_header(lov2cl(lov));
-        subhdr = cl_object_header(stripe);
-        parent = subhdr->coh_parent;
+       hdr    = cl_object_header(lov2cl(lov));
+       subhdr = cl_object_header(stripe);
 
        oinfo = lov->lo_lsm->lsm_oinfo[idx];
        CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
@@ -154,19 +153,24 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
               PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
               oinfo->loi_ost_idx, oinfo->loi_ost_gen);
 
-        if (parent == NULL) {
-                subhdr->coh_parent = hdr;
-                subhdr->coh_nesting = hdr->coh_nesting + 1;
-                lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
-                r0->lo_sub[idx] = cl2lovsub(stripe);
-                r0->lo_sub[idx]->lso_super = lov;
-                r0->lo_sub[idx]->lso_index = idx;
-                result = 0;
-        } else {
+       /* reuse ->coh_attr_guard to protect coh_parent change */
+       spin_lock(&subhdr->coh_attr_guard);
+       parent = subhdr->coh_parent;
+       if (parent == NULL) {
+               subhdr->coh_parent = hdr;
+               spin_unlock(&subhdr->coh_attr_guard);
+               subhdr->coh_nesting = hdr->coh_nesting + 1;
+               lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
+               r0->lo_sub[idx] = cl2lovsub(stripe);
+               r0->lo_sub[idx]->lso_super = lov;
+               r0->lo_sub[idx]->lso_index = idx;
+               result = 0;
+       } else {
                struct lu_object  *old_obj;
                struct lov_object *old_lov;
                unsigned int mask = D_INODE;
 
+               spin_unlock(&subhdr->coh_attr_guard);
                old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
                LASSERT(old_obj != NULL);
                old_lov = cl2lov(lu2cl(old_obj));
@@ -185,8 +189,20 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n");
                LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
                cl_object_put(env, stripe);
-        }
-        return result;
+       }
+       return result;
+}
+
+static int lov_page_slice_fixup(struct lov_object *lov,
+                               struct cl_object *stripe)
+{
+       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
+       struct cl_object *o;
+
+       cl_object_for_each(o, stripe)
+               o->co_slice_off += hdr->coh_page_bufsize;
+
+       return cl_object_header(stripe)->coh_page_bufsize;
 }
 
 static int lov_init_raid0(const struct lu_env *env,
@@ -215,12 +231,14 @@ static int lov_init_raid0(const struct lu_env *env,
        LASSERT(lov->lo_lsm == NULL);
        lov->lo_lsm = lsm_addref(lsm);
        r0->lo_nr  = lsm->lsm_stripe_count;
-        LASSERT(r0->lo_nr <= lov_targets_nr(dev));
+       LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
-        OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
-        if (r0->lo_sub != NULL) {
-                result = 0;
-                subconf->coc_inode = conf->coc_inode;
+       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+       if (r0->lo_sub != NULL) {
+               int psz = 0;
+
+               result = 0;
+               subconf->coc_inode = conf->coc_inode;
                spin_lock_init(&r0->lo_sub_lock);
                 /*
                  * Create stripe cl_objects.
@@ -230,32 +248,41 @@ static int lov_init_raid0(const struct lu_env *env,
                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
                         int ost_idx = oinfo->loi_ost_idx;
 
-                        result = ostid_to_fid(ofid, &oinfo->loi_oi,
+                       result = ostid_to_fid(ofid, &oinfo->loi_oi,
                                              oinfo->loi_ost_idx);
                        if (result != 0)
                                GOTO(out, result);
 
-                        subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
-                        subconf->u.coc_oinfo = oinfo;
-                        LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
+                       subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
+                       subconf->u.coc_oinfo = oinfo;
+                       LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
                        /* In the function below, .hs_keycmp resolves to
                         * lu_obj_hop_keycmp() */
                        /* coverity[overrun-buffer-val] */
-                        stripe = lov_sub_find(env, subdev, ofid, subconf);
-                        if (!IS_ERR(stripe)) {
-                                result = lov_init_sub(env, lov, stripe, r0, i);
+                       stripe = lov_sub_find(env, subdev, ofid, subconf);
+                       if (!IS_ERR(stripe)) {
+                               result = lov_init_sub(env, lov, stripe, r0, i);
                                if (result == -EAGAIN) { /* try again */
                                        --i;
                                        result = 0;
+                                       continue;
                                }
-                        } else {
-                                result = PTR_ERR(stripe);
+                       } else {
+                               result = PTR_ERR(stripe);
+                       }
+
+                       if (result == 0) {
+                               int sz = lov_page_slice_fixup(lov, stripe);
+                               LASSERT(ergo(psz > 0, psz == sz));
+                               psz = sz;
                        }
                 }
-        } else
-                result = -ENOMEM;
+               if (result == 0)
+                       cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+       } else
+               result = -ENOMEM;
 out:
-        RETURN(result);
+       RETURN(result);
 }
 
 static int lov_init_released(const struct lu_env *env,
@@ -280,18 +307,18 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
 
        lov_layout_wait(env, lov);
 
-       cl_object_prune(env, &lov->lo_cl);
+       cl_locks_prune(env, &lov->lo_cl, 0);
        return 0;
 }
 
 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
-                               struct lovsub_object *los, int idx)
+                              struct lovsub_object *los, int idx)
 {
-        struct cl_object        *sub;
-        struct lov_layout_raid0 *r0;
-        struct lu_site          *site;
-        struct lu_site_bkt_data *bkt;
-        cfs_waitlink_t          *waiter;
+       struct cl_object        *sub;
+       struct lov_layout_raid0 *r0;
+       struct lu_site          *site;
+       struct lu_site_bkt_data *bkt;
+       wait_queue_t          *waiter;
 
         r0  = &lov->u.raid0;
         LASSERT(r0->lo_sub[idx] == los);
@@ -307,28 +334,28 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 
         /* ... wait until it is actually destroyed---sub-object clears its
          * ->lo_sub[] slot in lovsub_object_fini() */
-        if (r0->lo_sub[idx] == los) {
-                waiter = &lov_env_info(env)->lti_waiter;
-                cfs_waitlink_init(waiter);
-                cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
-                cfs_set_current_state(CFS_TASK_UNINT);
-                while (1) {
-                        /* this wait-queue is signaled at the end of
-                         * lu_object_free(). */
-                        cfs_set_current_state(CFS_TASK_UNINT);
+       if (r0->lo_sub[idx] == los) {
+               waiter = &lov_env_info(env)->lti_waiter;
+               init_waitqueue_entry_current(waiter);
+               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               set_current_state(TASK_UNINTERRUPTIBLE);
+               while (1) {
+                       /* this wait-queue is signaled at the end of
+                        * lu_object_free(). */
+                       set_current_state(TASK_UNINTERRUPTIBLE);
                        spin_lock(&r0->lo_sub_lock);
                        if (r0->lo_sub[idx] == los) {
                                spin_unlock(&r0->lo_sub_lock);
-                               cfs_waitq_wait(waiter, CFS_TASK_UNINT);
+                               waitq_wait(waiter, TASK_UNINTERRUPTIBLE);
                        } else {
                                spin_unlock(&r0->lo_sub_lock);
-                                cfs_set_current_state(CFS_TASK_RUNNING);
-                                break;
-                        }
-                }
-                cfs_waitq_del(&bkt->lsb_marche_funebre, waiter);
-        }
-        LASSERT(r0->lo_sub[idx] == NULL);
+                               set_current_state(TASK_RUNNING);
+                               break;
+                       }
+               }
+               remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
+       }
+       LASSERT(r0->lo_sub[idx] == NULL);
 }
 
 static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
@@ -355,9 +382,9 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                                  */
                                 lov_subobject_kill(env, lov, los, i);
                        }
-                }
-        }
-       cl_object_prune(env, &lov->lo_cl);
+               }
+       }
+       cl_locks_prune(env, &lov->lo_cl, 0);
        RETURN(0);
 }
 
@@ -583,13 +610,13 @@ enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
 
 static inline void lov_conf_freeze(struct lov_object *lov)
 {
-       if (lov->lo_owner != cfs_current())
+       if (lov->lo_owner != current)
                down_read(&lov->lo_type_guard);
 }
 
 static inline void lov_conf_thaw(struct lov_object *lov)
 {
-       if (lov->lo_owner != cfs_current())
+       if (lov->lo_owner != current)
                up_read(&lov->lo_type_guard);
 }
 
@@ -627,10 +654,10 @@ do {                                                                    \
 
 static void lov_conf_lock(struct lov_object *lov)
 {
-       LASSERT(lov->lo_owner != cfs_current());
+       LASSERT(lov->lo_owner != current);
        down_write(&lov->lo_type_guard);
        LASSERT(lov->lo_owner == NULL);
-       lov->lo_owner = cfs_current();
+       lov->lo_owner = current;
 }
 
 static void lov_conf_unlock(struct lov_object *lov)
@@ -665,7 +692,6 @@ static int lov_layout_change(const struct lu_env *unused,
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
 
-       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
        void *cookie;
        struct lu_env *env;
        int refcheck;
@@ -691,13 +717,13 @@ static int lov_layout_change(const struct lu_env *unused,
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
 
+       cl_object_prune(env, &lov->lo_cl);
+
        result = old_ops->llo_delete(env, lov, &lov->u);
        if (result == 0) {
                old_ops->llo_fini(env, lov, &lov->u);
 
                LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
-               LASSERT(hdr->coh_tree.rnode == NULL);
-               LASSERT(hdr->coh_pages == 0);
 
                lov->lo_type = LLT_EMPTY;
                result = new_ops->llo_init(env,
@@ -736,7 +762,7 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
         ENTRY;
        init_rwsem(&lov->lo_type_guard);
        cfs_atomic_set(&lov->lo_active_ios, 0);
-       cfs_waitq_init(&lov->lo_waitq);
+       init_waitqueue_head(&lov->lo_waitq);
 
        cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
 
@@ -829,10 +855,10 @@ static int lov_object_print(const struct lu_env *env, void *cookie,
 }
 
 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
-                 struct cl_page *page, struct page *vmpage)
+                 struct cl_page *page, pgoff_t index)
 {
-        return LOV_2DISPATCH_NOLOCK(cl2lov(obj),
-                                   llo_page_init, env, obj, page, vmpage);
+       return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
+                                   index);
 }
 
 /**
@@ -929,7 +955,7 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
                lsm = lsm_addref(lov->lo_lsm);
                CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
                        lsm, cfs_atomic_read(&lsm->lsm_refc),
-                       lov->lo_layout_invalid, cfs_current());
+                       lov->lo_layout_invalid, current);
        }
        lov_conf_thaw(lov);
        return lsm;
@@ -941,7 +967,7 @@ void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
                return;
 
        CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
-               lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
+               lsm, cfs_atomic_read(&lsm->lsm_refc), current);
 
        lov_free_memmd(&lsm);
 }