Whamcloud - gitweb
b=21434
authorjxiong <jxiong>
Thu, 3 Dec 2009 03:22:59 +0000 (03:22 +0000)
committerjxiong <jxiong>
Thu, 3 Dec 2009 03:22:59 +0000 (03:22 +0000)
r=wangdi,eric.mei

We can't use env and io in osc page operations because they are not correctly setup. To support lockless IO, I choose to set srvlock flag at page initialization time.

lustre/lov/lov_page.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_page.c

index 4609de2..2e293bb 100644 (file)
@@ -140,60 +140,73 @@ struct cl_page *lov_page_init_raid0(const struct lu_env *env,
                                     struct cl_object *obj, struct cl_page *page,
                                     cfs_page_t *vmpage)
 {
-        struct lov_page   *lpg;
         struct lov_object *loo = cl2lov(obj);
+        struct lov_io     *lio = lov_env_io(env);
         int result;
 
+        loff_t   offset;
+        int      stripe;
+        obd_off  suboff;
+        struct cl_page          *subpage;
+        struct cl_object        *subobj;
+        struct lov_layout_raid0 *r0 = lov_r0(loo);
+        struct lov_io_sub       *sub;
+
         ENTRY;
-        OBD_SLAB_ALLOC_PTR_GFP(lpg, lov_page_kmem, CFS_ALLOC_IO);
-        if (lpg != NULL) {
-                loff_t   offset;
-                int      stripe;
-                obd_off  suboff;
-                struct cl_page          *subpage;
-                struct cl_object        *subobj;
-                struct lov_layout_raid0 *r0 = lov_r0(loo);
-
-                offset = cl_offset(obj, page->cp_index);
-                stripe = lov_stripe_number(r0->lo_lsm, offset);
-                result = lov_stripe_offset(r0->lo_lsm, offset, stripe,
-                                           &suboff);
-                LASSERT(stripe < r0->lo_nr);
-                LASSERT(result == 0);
-
-                subobj  = lovsub2cl(r0->lo_sub[stripe]);
-                subpage = cl_page_find(env, subobj,
-                                       cl_index(subobj, suboff), vmpage,
-                                       page->cp_type);
-                if (!IS_ERR(subpage)) {
-                        if (subpage->cp_parent != NULL) {
-                                /*
-                                 * This is only possible when TRANSIENT page
-                                 * is being created, and CACHEABLE sub-page
-                                 * (attached to already existing top-page) has
-                                 * been found. Tell cl_page_find() to use
-                                 * existing page.
-                                 */
-                                LASSERT(subpage->cp_type == CPT_CACHEABLE);
-                                LASSERT(page->cp_type == CPT_TRANSIENT);
-                                lpg->lps_invalid = 1;
-                                cl_page_put(env, subpage);
-                                /*
-                                 * XXX This assumes that lov is in the topmost
-                                 * cl_page.
-                                 */
-                                result = PTR_ERR(cl_page_top(subpage));
-                        } else {
-                                lu_ref_add(&subpage->cp_reference, "lov", page);
-                                subpage->cp_parent = page;
-                                page->cp_child = subpage;
-                        }
-                        cl_page_slice_add(page, &lpg->lps_cl,
-                                          obj, &lov_page_ops);
-                } else
-                        result = PTR_ERR(subpage);
+
+        offset = cl_offset(obj, page->cp_index);
+        stripe = lov_stripe_number(r0->lo_lsm, offset);
+        LASSERT(stripe < r0->lo_nr);
+        result = lov_stripe_offset(r0->lo_lsm, offset, stripe,
+                                   &suboff);
+        LASSERT(result == 0);
+
+        subobj = lovsub2cl(r0->lo_sub[stripe]);
+        sub    = lov_sub_get(env, lio, stripe);
+        if (IS_ERR(sub))
+                GOTO(out, result = PTR_ERR(sub));
+
+        subpage = cl_page_find(sub->sub_env, subobj,
+                               cl_index(subobj, suboff), vmpage,
+                               page->cp_type);
+        lov_sub_put(sub);
+        if (!IS_ERR(subpage)) {
+                struct lov_page *lpg;
+
+                OBD_SLAB_ALLOC_PTR_GFP(lpg, lov_page_kmem, CFS_ALLOC_IO);
+                if (lpg == NULL) {
+                        cl_page_put(env, subpage);
+                        GOTO(out, result = -ENOMEM);
+                }
+
+                if (subpage->cp_parent != NULL) {
+                        /*
+                         * This is only possible when TRANSIENT page
+                         * is being created, and CACHEABLE sub-page
+                         * (attached to already existing top-page) has
+                         * been found. Tell cl_page_find() to use
+                         * existing page.
+                         */
+                        LASSERT(subpage->cp_type == CPT_CACHEABLE);
+                        LASSERT(page->cp_type == CPT_TRANSIENT);
+                        lpg->lps_invalid = 1;
+                        cl_page_put(env, subpage);
+                        /*
+                         * XXX This assumes that lov is in the topmost
+                         * cl_page.
+                         */
+                        result = PTR_ERR(cl_page_top(subpage));
+                } else {
+                        lu_ref_add(&subpage->cp_reference, "lov", page);
+                        subpage->cp_parent = page;
+                        page->cp_child = subpage;
+                }
+                cl_page_slice_add(page, &lpg->lps_cl,
+                                  obj, &lov_page_ops);
         } else
-                result = -ENOMEM;
+                result = PTR_ERR(subpage);
+
+out:
         RETURN(ERR_PTR(result));
 }
 
index 7e0dfe5..7f91fb1 100644 (file)
@@ -281,7 +281,11 @@ struct osc_page {
          * True for a `temporary page' created by read-ahead code, probably
          * outside of any DLM lock.
          */
-                              ops_temp:1;
+                              ops_temp:1,
+        /**
+         * Set if the page must be transferred with OBD_BRW_SRVLOCK.
+         */
+                              ops_srvlock:1;
         /**
          * Linkage into a per-osc_object list of pages in flight. For
          * debugging.
index 294f415..3d3fe9f 100644 (file)
@@ -653,6 +653,7 @@ static void osc_req_attr_set(const struct lu_env *env,
 
                 olck = osc_lock_at(lock);
                 LASSERT(olck != NULL);
+                LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL));
                 /* check for lockless io. */
                 if (olck->ols_lock != NULL) {
                         oa->o_handle = olck->ols_lock->l_remote_handle;
index 2d5b63f..38581f5 100644 (file)
@@ -212,7 +212,6 @@ static int osc_page_cache_add(const struct lu_env *env,
 {
         struct osc_page   *opg = cl2osc_page(slice);
         struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
-        struct osc_io     *oio = osc_env_io(env);
         int result;
         int brw_flags;
         int noquota = 0;
@@ -221,7 +220,7 @@ static int osc_page_cache_add(const struct lu_env *env,
         ENTRY;
 
         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
-        brw_flags = osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0;
+        brw_flags = opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
         if (!client_is_remote(osc_export(obj)) &&
             cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                 brw_flags |= OBD_BRW_NOQUOTA;
@@ -303,7 +302,7 @@ static int osc_page_print(const struct lu_env *env,
         return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
                           "1< %#x %d %u %s %s %s > "
                           "2< %llu %u %#x %#x | %p %p %p %p %p > "
-                          "3< %s %p %d %lu > "
+                          "3< %s %p %d %lu %d > "
                           "4< %d %d %d %lu %s | %s %s %s %s > "
                           "5< %s %s %s %s | %d %s %s | %d %s %s>\n",
                           opg,
@@ -322,7 +321,7 @@ static int osc_page_print(const struct lu_env *env,
                           /* 3 */
                           osc_list(&opg->ops_inflight),
                           opg->ops_submitter, opg->ops_transfer_pinned,
-                          osc_submit_duration(opg),
+                          osc_submit_duration(opg), opg->ops_srvlock,
                           /* 4 */
                           cli->cl_r_in_flight, cli->cl_w_in_flight,
                           cli->cl_max_rpcs_in_flight,
@@ -571,9 +570,12 @@ struct cl_page *osc_page_init(const struct lu_env *env,
                                              cl_offset(obj, page->cp_index),
                                              &osc_async_page_ops,
                                              opg, (void **)&oap, 1, NULL);
-                if (result == 0)
+                if (result == 0) {
+                        struct osc_io *oio = osc_env_io(env);
+                        opg->ops_srvlock = osc_io_srvlock(oio);
                         cl_page_slice_add(page, &opg->ops_cl, obj,
                                           &osc_page_ops);
+                }
                 /*
                  * Cannot assert osc_page_protected() here as read-ahead
                  * creates temporary pages outside of a lock.