Whamcloud - gitweb
LU-5261 osc: use wait_for_completion_killable() instead
[fs/lustre-release.git] / lustre / osc / osc_io.c
index a94ac61..0070308 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -67,14 +67,18 @@ static struct osc_io *cl2osc_io(const struct lu_env *env,
         return oio;
 }
 
-static struct osc_page *osc_cl_page_osc(struct cl_page *page)
+static struct osc_page *osc_cl_page_osc(struct cl_page *page,
+                                       struct osc_object *osc)
 {
-        const struct cl_page_slice *slice;
+       const struct cl_page_slice *slice;
 
-        slice = cl_page_at(page, &osc_device_type);
-        LASSERT(slice != NULL);
+       if (osc != NULL)
+               slice = cl_object_page_slice(&osc->oo_cl, page);
+       else
+               slice = cl_page_at(page, &osc_device_type);
+       LASSERT(slice != NULL);
 
-        return cl2osc_page(slice);
+       return cl2osc_page(slice);
 }
 
 
@@ -99,14 +103,13 @@ static int osc_io_submit(const struct lu_env *env,
                          const struct cl_io_slice *ios,
                         enum cl_req_type crt, struct cl_2queue *queue)
 {
-       struct obd_export *exp;
-        struct cl_page    *page;
-        struct cl_page    *tmp;
-        struct client_obd *cli  = NULL;
-        struct osc_object *osc  = NULL; /* to keep gcc happy */
-        struct osc_page   *opg;
-        struct cl_io      *io;
-       CFS_LIST_HEAD     (list);
+       struct cl_page    *page;
+       struct cl_page    *tmp;
+       struct client_obd *cli  = NULL;
+       struct osc_object *osc  = NULL; /* to keep gcc happy */
+       struct osc_page   *opg;
+       struct cl_io      *io;
+       struct list_head  list = LIST_HEAD_INIT(list);
 
        struct cl_page_list *qin      = &queue->c2_qin;
        struct cl_page_list *qout     = &queue->c2_qout;
@@ -121,7 +124,6 @@ static int osc_io_submit(const struct lu_env *env,
        CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt);
 
        osc = cl2osc(ios->cis_obj);
-       exp = osc_export(osc);
        cli = osc_cli(osc);
        max_pages = cli->cl_max_pages_per_rpc;
 
@@ -139,12 +141,12 @@ static int osc_io_submit(const struct lu_env *env,
                 io = page->cp_owner;
                 LASSERT(io != NULL);
 
-                opg = osc_cl_page_osc(page);
-                oap = &opg->ops_oap;
+               opg = osc_cl_page_osc(page, osc);
+               oap = &opg->ops_oap;
                LASSERT(osc == oap->oap_obj);
 
-               if (!cfs_list_empty(&oap->oap_pending_item) ||
-                   !cfs_list_empty(&oap->oap_rpc_item)) {
+               if (!list_empty(&oap->oap_pending_item) ||
+                   !list_empty(&oap->oap_rpc_item)) {
                        CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
                               oap, opg);
                         result = -EBUSY;
@@ -165,12 +167,19 @@ static int osc_io_submit(const struct lu_env *env,
                        continue;
                 }
 
-               cl_page_list_move(qout, qin, page);
+               spin_lock(&oap->oap_lock);
                oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
                oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+               spin_unlock(&oap->oap_lock);
 
                osc_page_submit(env, opg, crt, brw_flags);
-               cfs_list_add_tail(&oap->oap_pending_item, &list);
+               list_add_tail(&oap->oap_pending_item, &list);
+
+               if (page->cp_sync_io != NULL)
+                       cl_page_list_move(qout, qin, page);
+               else /* async IO */
+                       cl_page_list_del(env, qin, page);
+
                if (++queued == max_pages) {
                        queued = 0;
                        result = osc_queue_sync_pages(env, osc, &list, cmd,
@@ -187,6 +196,13 @@ static int osc_io_submit(const struct lu_env *env,
        return qout->pl_nr > 0 ? 0 : result;
 }
 
+/**
+ * This is called when a page is accessed within file in a way that creates
+ * new page, if one were missing (i.e., if there were a hole at that place in
+ * the file, or accessed page is beyond the current file size).
+ *
+ * Expand stripe KMS if necessary.
+ */
 static void osc_page_touch_at(const struct lu_env *env,
                               struct cl_object *obj, pgoff_t idx, unsigned to)
 {
@@ -210,7 +226,8 @@ static void osc_page_touch_at(const struct lu_env *env,
                kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
                loi->loi_lvb.lvb_size);
 
-        valid = 0;
+       attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
+       valid = CAT_MTIME | CAT_CTIME;
         if (kms > loi->loi_kms) {
                 attr->cat_kms = kms;
                 valid |= CAT_KMS;
@@ -223,92 +240,127 @@ static void osc_page_touch_at(const struct lu_env *env,
         cl_object_attr_unlock(obj);
 }
 
-/**
- * This is called when a page is accessed within file in a way that creates
- * new page, if one were missing (i.e., if there were a hole at that place in
- * the file, or accessed page is beyond the current file size). Examples:
- * ->commit_write() and ->nopage() methods.
- *
- * Expand stripe KMS if necessary.
- */
-static void osc_page_touch(const struct lu_env *env,
-                           struct osc_page *opage, unsigned to)
+static int osc_io_commit_async(const struct lu_env *env,
+                               const struct cl_io_slice *ios,
+                               struct cl_page_list *qin, int from, int to,
+                               cl_commit_cbt cb)
 {
-        struct cl_page    *page = opage->ops_cl.cpl_page;
-        struct cl_object  *obj  = opage->ops_cl.cpl_obj;
+       struct cl_io    *io = ios->cis_io;
+       struct osc_io   *oio = cl2osc_io(env, ios);
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct cl_page  *page;
+       struct cl_page  *last_page;
+       struct osc_page *opg;
+       int result = 0;
+       ENTRY;
 
-        osc_page_touch_at(env, obj, page->cp_index, to);
-}
+       LASSERT(qin->pl_nr > 0);
 
-/**
- * Implements cl_io_operations::cio_prepare_write() method for osc layer.
- *
- * \retval -EIO transfer initiated against this osc will most likely fail
- * \retval 0    transfer initiated against this osc will most likely succeed.
- *
- * The reason for this check is to immediately return an error to the caller
- * in the case of a deactivated import. Note, that import can be deactivated
- * later, while pages, dirtied by this IO, are still in the cache, but this is
- * irrelevant, because that would still return an error to the application (if
- * it does fsync), but many applications don't do fsync because of performance
- * issues, and we wanted to return an -EIO at write time to notify the
- * application.
- */
-static int osc_io_prepare_write(const struct lu_env *env,
-                                const struct cl_io_slice *ios,
-                                const struct cl_page_slice *slice,
-                                unsigned from, unsigned to)
-{
-        struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev);
-        struct obd_import *imp = class_exp2cliimp(dev->od_exp);
-        struct osc_io     *oio = cl2osc_io(env, ios);
-        int result = 0;
-        ENTRY;
+       /* Handle partial page cases */
+       last_page = cl_page_list_last(qin);
+       if (oio->oi_lockless) {
+               page = cl_page_list_first(qin);
+               if (page == last_page) {
+                       cl_page_clip(env, page, from, to);
+               } else {
+                       if (from != 0)
+                               cl_page_clip(env, page, from, PAGE_SIZE);
+                       if (to != PAGE_SIZE)
+                               cl_page_clip(env, last_page, 0, to);
+               }
+       }
 
-        /*
-         * This implements OBD_BRW_CHECK logic from old client.
-         */
+       while (qin->pl_nr > 0) {
+               struct osc_async_page *oap;
+
+               page = cl_page_list_first(qin);
+               opg = osc_cl_page_osc(page, osc);
+               oap = &opg->ops_oap;
+
+               if (!list_empty(&oap->oap_rpc_item)) {
+                       CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
+                              oap, opg);
+                       result = -EBUSY;
+                       break;
+               }
 
-        if (imp == NULL || imp->imp_invalid)
-                result = -EIO;
-        if (result == 0 && oio->oi_lockless)
-                /* this page contains `invalid' data, but who cares?
-                 * nobody can access the invalid data.
-                 * in osc_io_commit_write(), we're going to write exact
-                 * [from, to) bytes of this page to OST. -jay */
-                cl_page_export(env, slice->cpl_page, 1);
+               /* The page may be already in dirty cache. */
+               if (list_empty(&oap->oap_pending_item)) {
+                       result = osc_page_cache_add(env, &opg->ops_cl, io);
+                       if (result != 0)
+                               break;
+               }
 
-        RETURN(result);
+               osc_page_touch_at(env, osc2cl(osc), osc_index(opg),
+                                 page == last_page ? to : PAGE_SIZE);
+
+               cl_page_list_del(env, qin, page);
+
+               (*cb)(env, io, page);
+               /* Can't access page any more. Page can be in transfer and
+                * complete at any time. */
+       }
+
+       /* for sync write, kernel will wait for this page to be flushed before
+        * osc_io_end() is called, so release it earlier.
+        * for mkwrite(), it's known there is no further pages. */
+       if (cl_io_is_sync_write(io) && oio->oi_active != NULL) {
+               osc_extent_release(env, oio->oi_active);
+               oio->oi_active = NULL;
+       }
+
+       CDEBUG(D_INFO, "%d %d\n", qin->pl_nr, result);
+       RETURN(result);
 }
 
-static int osc_io_commit_write(const struct lu_env *env,
-                               const struct cl_io_slice *ios,
-                               const struct cl_page_slice *slice,
-                               unsigned from, unsigned to)
+static int osc_io_rw_iter_init(const struct lu_env *env,
+                               const struct cl_io_slice *ios)
 {
-        struct osc_io         *oio = cl2osc_io(env, ios);
-        struct osc_page       *opg = cl2osc_page(slice);
-        struct osc_object     *obj = cl2osc(opg->ops_cl.cpl_obj);
-        struct osc_async_page *oap = &opg->ops_oap;
-        ENTRY;
+       struct cl_io *io = ios->cis_io;
+       struct osc_io *oio = osc_env_io(env);
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct client_obd *cli = osc_cli(osc);
+       unsigned long c;
+       unsigned int npages;
+       unsigned int max_pages;
+       ENTRY;
 
-        LASSERT(to > 0);
-        /*
-         * XXX instead of calling osc_page_touch() here and in
-         * osc_io_fault_start() it might be more logical to introduce
-         * cl_page_touch() method, that generic cl_io_commit_write() and page
-         * fault code calls.
-         */
-        osc_page_touch(env, cl2osc_page(slice), to);
-        if (!client_is_remote(osc_export(obj)) &&
-            cfs_capable(CFS_CAP_SYS_RESOURCE))
-                oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
+       if (cl_io_is_append(io))
+               RETURN(0);
 
-        if (oio->oi_lockless)
-                /* see osc_io_prepare_write() for lockless io handling. */
-                cl_page_clip(env, slice->cpl_page, from, to);
+       npages = io->u.ci_rw.crw_count >> PAGE_CACHE_SHIFT;
+       if (io->u.ci_rw.crw_pos & ~CFS_PAGE_MASK)
+               ++npages;
 
-        RETURN(0);
+       max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+       if (npages > max_pages)
+               npages = max_pages;
+
+       c = atomic_read(cli->cl_lru_left);
+       if (c < npages && osc_lru_reclaim(cli) > 0)
+               c = atomic_read(cli->cl_lru_left);
+       while (c >= npages) {
+               if (c == atomic_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+                       oio->oi_lru_reserved = npages;
+                       break;
+               }
+               c = atomic_read(cli->cl_lru_left);
+       }
+
+       RETURN(0);
+}
+
+static void osc_io_rw_iter_fini(const struct lu_env *env,
+                               const struct cl_io_slice *ios)
+{
+       struct osc_io *oio = osc_env_io(env);
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct client_obd *cli = osc_cli(osc);
+
+       if (oio->oi_lru_reserved > 0) {
+               atomic_add(oio->oi_lru_reserved, cli->cl_lru_left);
+               oio->oi_lru_reserved = 0;
+       }
 }
 
 static int osc_io_fault_start(const struct lu_env *env,
@@ -339,7 +391,7 @@ static int osc_async_upcall(void *a, int rc)
        struct osc_async_cbargs *args = a;
 
         args->opc_rc = rc;
-        cfs_complete(&args->opc_sync);
+       complete(&args->opc_sync);
         return 0;
 }
 
@@ -348,31 +400,22 @@ static int osc_async_upcall(void *a, int rc)
  * Checks that there are no pages being written in the extent being truncated.
  */
 static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
-                         struct cl_page *page, void *cbdata)
+                         struct osc_page *ops , void *cbdata)
 {
-       const struct cl_page_slice *slice;
-       struct osc_page *ops;
+       struct cl_page *page = ops->ops_cl.cpl_page;
        struct osc_async_page *oap;
        __u64 start = *(__u64 *)cbdata;
 
-       slice = cl_page_at(page, &osc_device_type);
-       LASSERT(slice != NULL);
-       ops = cl2osc_page(slice);
        oap = &ops->ops_oap;
-
        if (oap->oap_cmd & OBD_BRW_WRITE &&
-           !cfs_list_empty(&oap->oap_pending_item))
+           !list_empty(&oap->oap_pending_item))
                CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n",
                                start, current->comm);
 
 #ifdef __linux__
-       {
-               cfs_page_t *vmpage = cl_page_vmpage(env, page);
-               if (PageLocked(vmpage))
-                       CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
-                              ops, page->cp_index,
-                              (oap->oap_cmd & OBD_BRW_RWMASK));
-       }
+       if (PageLocked(page->cp_vmpage))
+               CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
+                      ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
 #endif
 
        return CLP_GANG_OKAY;
@@ -392,8 +435,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
         /*
          * Complain if there are pages in the truncated region.
          */
-       cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF,
-                           trunc_check_cb, (void *)&size);
+       osc_page_gang_lookup(env, io, cl2osc(clob),
+                               start + partial, CL_PAGE_EOF,
+                               trunc_check_cb, (void *)&size);
 }
 #else /* __KERNEL__ */
 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
@@ -451,13 +495,12 @@ static int osc_io_setattr_start(const struct lu_env *env,
         }
         memset(oa, 0, sizeof(*oa));
         if (result == 0) {
-                oa->o_id = loi->loi_id;
-                oa->o_seq = loi->loi_seq;
-                oa->o_mtime = attr->cat_mtime;
-                oa->o_atime = attr->cat_atime;
-                oa->o_ctime = attr->cat_ctime;
-                oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
-                        OBD_MD_FLCTIME | OBD_MD_FLMTIME;
+               oa->o_oi = loi->loi_oi;
+               oa->o_mtime = attr->cat_mtime;
+               oa->o_atime = attr->cat_atime;
+               oa->o_ctime = attr->cat_ctime;
+               oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
+                       OBD_MD_FLCTIME | OBD_MD_FLMTIME;
                 if (ia_valid & ATTR_SIZE) {
                         oa->o_size = size;
                         oa->o_blocks = OBD_OBJECT_EOF;
@@ -473,7 +516,7 @@ static int osc_io_setattr_start(const struct lu_env *env,
 
                 oinfo.oi_oa = oa;
                 oinfo.oi_capa = io->u.ci_setattr.sa_capa;
-                cfs_init_completion(&cbargs->opc_sync);
+               init_completion(&cbargs->opc_sync);
 
                 if (ia_valid & ATTR_SIZE)
                         result = osc_punch_base(osc_export(cl2osc(obj)),
@@ -484,6 +527,7 @@ static int osc_io_setattr_start(const struct lu_env *env,
                                                         &oinfo, NULL,
                                                        osc_async_upcall,
                                                         cbargs, PTLRPCD_SET);
+               cbargs->opc_rpc_sent = result == 0;
         }
         return result;
 }
@@ -495,11 +539,14 @@ static void osc_io_setattr_end(const struct lu_env *env,
        struct osc_io    *oio = cl2osc_io(env, slice);
        struct cl_object *obj = slice->cis_obj;
        struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
-        int result;
-
-        cfs_wait_for_completion(&cbargs->opc_sync);
+        int result = 0;
 
-        result = io->ci_result = cbargs->opc_rc;
+       if (cbargs->opc_rpc_sent) {
+               result = wait_for_completion_killable(&cbargs->opc_sync);
+               if (result == 0)
+                       result = cbargs->opc_rc;
+               io->ci_result = result;
+       }
         if (result == 0) {
                 if (oio->oi_lockless) {
                         /* lockless truncate */
@@ -524,46 +571,36 @@ static void osc_io_setattr_end(const struct lu_env *env,
 static int osc_io_read_start(const struct lu_env *env,
                              const struct cl_io_slice *slice)
 {
-        struct osc_io    *oio   = cl2osc_io(env, slice);
-        struct cl_object *obj   = slice->cis_obj;
-        struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
-        int              result = 0;
-        ENTRY;
+       struct cl_object *obj  = slice->cis_obj;
+       struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
+       int rc = 0;
+       ENTRY;
 
-        if (oio->oi_lockless == 0) {
-                cl_object_attr_lock(obj);
-                result = cl_object_attr_get(env, obj, attr);
-                if (result == 0) {
-                        attr->cat_atime = LTIME_S(CFS_CURRENT_TIME);
-                        result = cl_object_attr_set(env, obj, attr,
-                                                    CAT_ATIME);
-                }
-                cl_object_attr_unlock(obj);
-        }
-        RETURN(result);
+       if (!slice->cis_io->ci_noatime) {
+               cl_object_attr_lock(obj);
+               attr->cat_atime = LTIME_S(CFS_CURRENT_TIME);
+               rc = cl_object_attr_set(env, obj, attr, CAT_ATIME);
+               cl_object_attr_unlock(obj);
+       }
+
+       RETURN(rc);
 }
 
 static int osc_io_write_start(const struct lu_env *env,
                               const struct cl_io_slice *slice)
 {
-        struct osc_io    *oio   = cl2osc_io(env, slice);
-        struct cl_object *obj   = slice->cis_obj;
-        struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
-        int              result = 0;
-        ENTRY;
+       struct cl_object *obj   = slice->cis_obj;
+       struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
+       int rc = 0;
+       ENTRY;
 
-        if (oio->oi_lockless == 0) {
-                cl_object_attr_lock(obj);
-                result = cl_object_attr_get(env, obj, attr);
-                if (result == 0) {
-                        attr->cat_mtime = attr->cat_ctime =
-                                LTIME_S(CFS_CURRENT_TIME);
-                        result = cl_object_attr_set(env, obj, attr,
-                                                    CAT_MTIME | CAT_CTIME);
-                }
-                cl_object_attr_unlock(obj);
-        }
-        RETURN(result);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
+       cl_object_attr_lock(obj);
+       attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
+       rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME);
+       cl_object_attr_unlock(obj);
+
+       RETURN(rc);
 }
 
 static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
@@ -578,8 +615,7 @@ static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
        ENTRY;
 
        memset(oa, 0, sizeof(*oa));
-       oa->o_id = loi->loi_id;
-       oa->o_seq = loi->loi_seq;
+       oa->o_oi = loi->loi_oi;
        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
 
        /* reload size abd blocks for start and end of sync range */
@@ -592,7 +628,7 @@ static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
        memset(oinfo, 0, sizeof(*oinfo));
        oinfo->oi_oa = oa;
        oinfo->oi_capa = fio->fi_capa;
-       cfs_init_completion(&cbargs->opc_sync);
+       init_completion(&cbargs->opc_sync);
 
        rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs,
                           PTLRPCD_SET);
@@ -654,7 +690,7 @@ static void osc_io_fsync_end(const struct lu_env *env,
                struct osc_io           *oio    = cl2osc_io(env, slice);
                struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
 
-               cfs_wait_for_completion(&cbargs->opc_sync);
+               result = wait_for_completion_killable(&cbargs->opc_sync);
                if (result == 0)
                        result = cbargs->opc_rc;
        }
@@ -673,21 +709,23 @@ static void osc_io_end(const struct lu_env *env,
 }
 
 static const struct cl_io_operations osc_io_ops = {
-        .op = {
-                [CIT_READ] = {
-                        .cio_start  = osc_io_read_start,
-                        .cio_fini   = osc_io_fini
-                },
-                [CIT_WRITE] = {
-                        .cio_start  = osc_io_write_start,
+       .op = {
+               [CIT_READ] = {
+                       .cio_start  = osc_io_read_start,
+                       .cio_fini   = osc_io_fini
+               },
+               [CIT_WRITE] = {
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
+                       .cio_start  = osc_io_write_start,
                        .cio_end    = osc_io_end,
-                        .cio_fini   = osc_io_fini
-                },
-                [CIT_SETATTR] = {
-                        .cio_start  = osc_io_setattr_start,
-                        .cio_end    = osc_io_setattr_end
-                },
-                [CIT_FAULT] = {
+                       .cio_fini   = osc_io_fini
+               },
+               [CIT_SETATTR] = {
+                       .cio_start  = osc_io_setattr_start,
+                       .cio_end    = osc_io_setattr_end
+               },
+               [CIT_FAULT] = {
                        .cio_start  = osc_io_fault_start,
                        .cio_end    = osc_io_end,
                        .cio_fini   = osc_io_fini
@@ -697,20 +735,12 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_end    = osc_io_fsync_end,
                        .cio_fini   = osc_io_fini
                },
-                [CIT_MISC] = {
-                        .cio_fini   = osc_io_fini
-                }
-        },
-        .req_op = {
-                 [CRT_READ] = {
-                         .cio_submit    = osc_io_submit
-                 },
-                 [CRT_WRITE] = {
-                         .cio_submit    = osc_io_submit
-                 }
-         },
-        .cio_prepare_write = osc_io_prepare_write,
-        .cio_commit_write  = osc_io_commit_write
+               [CIT_MISC] = {
+                       .cio_fini   = osc_io_fini
+               }
+       },
+       .cio_submit                 = osc_io_submit,
+       .cio_commit_async           = osc_io_commit_async
 };
 
 /*****************************************************************************
@@ -740,43 +770,61 @@ static void osc_req_completion(const struct lu_env *env,
  * fields.
  */
 static void osc_req_attr_set(const struct lu_env *env,
-                             const struct cl_req_slice *slice,
-                             const struct cl_object *obj,
-                             struct cl_req_attr *attr, obd_valid flags)
+                            const struct cl_req_slice *slice,
+                            const struct cl_object *obj,
+                            struct cl_req_attr *attr, obd_valid flags)
 {
-        struct lov_oinfo *oinfo;
-        struct cl_req    *clerq;
-        struct cl_page   *apage; /* _some_ page in @clerq */
-        struct cl_lock   *lock;  /* _some_ lock protecting @apage */
-        struct osc_lock  *olck;
-        struct osc_page  *opg;
-        struct obdo      *oa;
-
-        oa = attr->cra_oa;
-        oinfo = cl2osc(obj)->oo_oinfo;
-        if (flags & OBD_MD_FLID) {
-                oa->o_id = oinfo->loi_id;
-                oa->o_valid |= OBD_MD_FLID;
-        }
-        if (flags & OBD_MD_FLGROUP) {
-                oa->o_seq = oinfo->loi_seq;
-                oa->o_valid |= OBD_MD_FLGROUP;
-        }
-        if (flags & OBD_MD_FLHANDLE) {
-                clerq = slice->crs_req;
-                LASSERT(!cfs_list_empty(&clerq->crq_pages));
-                apage = container_of(clerq->crq_pages.next,
-                                     struct cl_page, cp_flight);
-                opg = osc_cl_page_osc(apage);
-                apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */
-                lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1);
-                if (lock == NULL) {
-                        struct cl_object_header *head;
-                        struct cl_lock          *scan;
-
-                        head = cl_object_header(apage->cp_obj);
-                        cfs_list_for_each_entry(scan, &head->coh_locks,
-                                                cll_linkage)
+       struct lov_oinfo *oinfo;
+       struct cl_req    *clerq;
+       struct cl_page   *apage; /* _some_ page in @clerq */
+       struct cl_lock   *lock;  /* _some_ lock protecting @apage */
+       struct osc_lock  *olck;
+       struct osc_page  *opg;
+       struct obdo      *oa;
+       struct ost_lvb   *lvb;
+
+       oinfo   = cl2osc(obj)->oo_oinfo;
+       lvb     = &oinfo->loi_lvb;
+       oa      = attr->cra_oa;
+
+       if ((flags & OBD_MD_FLMTIME) != 0) {
+               oa->o_mtime = lvb->lvb_mtime;
+               oa->o_valid |= OBD_MD_FLMTIME;
+       }
+       if ((flags & OBD_MD_FLATIME) != 0) {
+               oa->o_atime = lvb->lvb_atime;
+               oa->o_valid |= OBD_MD_FLATIME;
+       }
+       if ((flags & OBD_MD_FLCTIME) != 0) {
+               oa->o_ctime = lvb->lvb_ctime;
+               oa->o_valid |= OBD_MD_FLCTIME;
+       }
+       if (flags & OBD_MD_FLGROUP) {
+               ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi));
+               oa->o_valid |= OBD_MD_FLGROUP;
+       }
+       if (flags & OBD_MD_FLID) {
+               ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi));
+               oa->o_valid |= OBD_MD_FLID;
+       }
+       if (flags & OBD_MD_FLHANDLE) {
+               struct cl_object *subobj;
+
+               clerq = slice->crs_req;
+               LASSERT(!list_empty(&clerq->crq_pages));
+               apage = container_of(clerq->crq_pages.next,
+                                    struct cl_page, cp_flight);
+               opg = osc_cl_page_osc(apage, NULL);
+               subobj = opg->ops_cl.cpl_obj;
+               lock = cl_lock_at_pgoff(env, subobj, osc_index(opg),
+                                       NULL, 1, 1);
+               if (lock == NULL) {
+                       struct cl_object_header *head;
+                       struct cl_lock          *scan;
+
+                       head = cl_object_header(subobj);
+                       list_for_each_entry(scan, &head->coh_locks,
+                                           cll_linkage)
                                 CL_LOCK_DEBUG(D_ERROR, env, scan,
                                               "no cover page!\n");
                         CL_PAGE_DEBUG(D_ERROR, env, apage,
@@ -815,18 +863,18 @@ int osc_io_init(const struct lu_env *env,
 }
 
 int osc_req_init(const struct lu_env *env, struct cl_device *dev,
-                 struct cl_req *req)
+                struct cl_req *req)
 {
-        struct osc_req *or;
-        int result;
-
-        OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, CFS_ALLOC_IO);
-        if (or != NULL) {
-                cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
-                result = 0;
-        } else
-                result = -ENOMEM;
-        return result;
+       struct osc_req *or;
+       int result;
+
+       OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS);
+       if (or != NULL) {
+               cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
+               result = 0;
+       } else
+               result = -ENOMEM;
+       return result;
 }
 
 /** @} osc */