Whamcloud - gitweb
LU-1030 osc: new IO engine implementation
[fs/lustre-release.git] / lustre / osc / osc_page.c
index 32fb5f4..bc234e2 100644 (file)
@@ -206,9 +206,10 @@ static void osc_page_transfer_add(const struct lu_env *env,
 }
 
 static int osc_page_cache_add(const struct lu_env *env,
-                              const struct cl_page_slice *slice,
-                              struct cl_io *unused)
+                             const struct cl_page_slice *slice,
+                             struct cl_io *io)
 {
+       struct osc_io   *oio = osc_env_io(env);
        struct osc_page *opg = cl2osc_page(slice);
        int result;
        ENTRY;
@@ -216,11 +217,22 @@ static int osc_page_cache_add(const struct lu_env *env,
        LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
 
        osc_page_transfer_get(opg, "transfer\0cache");
-       result = osc_queue_async_io(env, opg);
+       result = osc_queue_async_io(env, io, opg);
        if (result != 0)
                osc_page_transfer_put(env, opg);
        else
                osc_page_transfer_add(env, opg, CRT_WRITE);
+
+       /* for sync write, kernel will wait for this page to be flushed before
+        * osc_io_end() is called, so release it earlier.
+        * for mkwrite(), it's known there is no further pages. */
+       if (cl_io_is_sync_write(io) || cl_io_is_mkwrite(io)) {
+               if (oio->oi_active != NULL) {
+                       osc_extent_release(env, oio->oi_active);
+                       oio->oi_active = NULL;
+               }
+       }
+
        RETURN(result);
 }
 
@@ -341,17 +353,16 @@ static int osc_page_print(const struct lu_env *env,
         struct client_obd     *cli = &osc_export(obj)->exp_obd->u.cli;
 
         return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
-                          "1< %#x %d %u %s %s %s > "
+                         "1< %#x %d %u %s %s > "
                          "2< "LPU64" %u %u %#x %#x | %p %p %p > "
-                          "3< %s %p %d %lu %d > "
-                          "4< %d %d %d %lu %s | %s %s %s %s > "
-                          "5< %s %s %s %s | %d %s %s | %d %s %s>\n",
+                         "3< %s %p %d %lu %d > "
+                         "4< %d %d %d %lu %s | %s %s %s %s > "
+                         "5< %s %s %s %s | %d %s | %d %s %s>\n",
                           opg,
                           /* 1 */
                           oap->oap_magic, oap->oap_cmd,
                           oap->oap_interrupted,
                           osc_list(&oap->oap_pending_item),
-                          osc_list(&oap->oap_urgent_item),
                           osc_list(&oap->oap_rpc_item),
                           /* 2 */
                           oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
@@ -375,12 +386,11 @@ static int osc_page_print(const struct lu_env *env,
                          osc_list(&obj->oo_hp_ready_item),
                          osc_list(&obj->oo_write_item),
                          osc_list(&obj->oo_read_item),
-                         obj->oo_read_pages.oop_num_pending,
-                         osc_list(&obj->oo_read_pages.oop_pending),
-                         osc_list(&obj->oo_read_pages.oop_urgent),
-                         obj->oo_write_pages.oop_num_pending,
-                         osc_list(&obj->oo_write_pages.oop_pending),
-                         osc_list(&obj->oo_write_pages.oop_urgent));
+                         cfs_atomic_read(&obj->oo_nr_reads),
+                         osc_list(&obj->oo_reading_exts),
+                         cfs_atomic_read(&obj->oo_nr_writes),
+                         osc_list(&obj->oo_hp_exts),
+                         osc_list(&obj->oo_urgent_exts));
 }
 
 static void osc_page_delete(const struct lu_env *env,
@@ -395,7 +405,7 @@ static void osc_page_delete(const struct lu_env *env,
         ENTRY;
         CDEBUG(D_TRACE, "%p\n", opg);
         osc_page_transfer_put(env, opg);
-       rc = osc_teardown_async_page(obj, opg);
+       rc = osc_teardown_async_page(env, obj, opg);
         if (rc) {
                 CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(slice->cpl_page),
                               "Trying to teardown failed: %d\n", rc);
@@ -425,23 +435,31 @@ void osc_page_clip(const struct lu_env *env, const struct cl_page_slice *slice,
 static int osc_page_cancel(const struct lu_env *env,
                            const struct cl_page_slice *slice)
 {
-        struct osc_page *opg       = cl2osc_page(slice);
-        struct osc_async_page *oap = &opg->ops_oap;
+       struct osc_page *opg = cl2osc_page(slice);
         int rc = 0;
 
         LINVRNT(osc_page_protected(env, opg, CLM_READ, 0));
 
-        client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
         /* Check if the transferring against this page
          * is completed, or not even queued. */
         if (opg->ops_transfer_pinned)
                 /* FIXME: may not be interrupted.. */
                rc = osc_cancel_async_page(env, opg);
         LASSERT(ergo(rc == 0, opg->ops_transfer_pinned == 0));
-        client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
         return rc;
 }
 
+static int osc_page_flush(const struct lu_env *env,
+                         const struct cl_page_slice *slice,
+                         struct cl_io *io)
+{
+       struct osc_page *opg = cl2osc_page(slice);
+       int rc = 0;
+       ENTRY;
+       rc = osc_flush_async_page(env, io, opg);
+       RETURN(rc);
+}
+
 static const struct cl_page_operations osc_page_ops = {
         .cpo_fini          = osc_page_fini,
         .cpo_print         = osc_page_print,
@@ -458,7 +476,8 @@ static const struct cl_page_operations osc_page_ops = {
                 }
         },
         .cpo_clip           = osc_page_clip,
-        .cpo_cancel         = osc_page_cancel
+       .cpo_cancel         = osc_page_cancel,
+       .cpo_flush          = osc_page_flush
 };
 
 struct cl_page *osc_page_init(const struct lu_env *env,
@@ -499,19 +518,34 @@ struct cl_page *osc_page_init(const struct lu_env *env,
  * Helper function called by osc_io_submit() for every page in an immediate
  * transfer (i.e., transferred synchronously).
  */
-void osc_io_submit_page(const struct lu_env *env,
-                        struct osc_io *oio, struct osc_page *opg,
-                        enum cl_req_type crt)
+void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
+                    enum cl_req_type crt, int brw_flags)
 {
-        LINVRNT(osc_page_protected(env, opg,
-                                   crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1));
-
-       osc_queue_sync_page(env, opg,
-                           crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
-                           osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0);
-
-        osc_page_transfer_get(opg, "transfer\0imm");
-        osc_page_transfer_add(env, opg, crt);
+       struct osc_async_page *oap = &opg->ops_oap;
+       struct osc_object     *obj = oap->oap_obj;
+
+       LINVRNT(osc_page_protected(env, opg,
+                                  crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1));
+
+       LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
+                "magic 0x%x\n", oap, oap->oap_magic);
+       LASSERT(oap->oap_async_flags & ASYNC_READY);
+       LASSERT(oap->oap_async_flags & ASYNC_COUNT_STABLE);
+
+       oap->oap_cmd       = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
+       oap->oap_page_off  = opg->ops_from;
+       oap->oap_count     = opg->ops_to - opg->ops_from;
+       oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
+
+       if (!client_is_remote(osc_export(obj)) &&
+                       cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+               oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
+               oap->oap_cmd |= OBD_BRW_NOQUOTA;
+       }
+
+       opg->ops_submit_time = cfs_time_current();
+       osc_page_transfer_get(opg, "transfer\0imm");
+       osc_page_transfer_add(env, opg, crt);
 }
 
 /** @} osc */