Whamcloud - gitweb
LU-6943 clio: get rid of cl_req 33/15833/6
authorJinshan Xiong <jinshan.xiong@intel.com>
Mon, 3 Aug 2015 17:01:35 +0000 (10:01 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 16 Sep 2015 01:12:28 +0000 (01:12 +0000)
Implement cl_req_attr_set with a cl_object operation.
Get rid of cl_req and related function and data structures.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I7ea4af3eed65c312835e19f12dd01c7c87445352
Reviewed-on: http://review.whamcloud.com/15833
Tested-by: Jenkins
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
22 files changed:
lustre/doc/clio.txt
lustre/include/cl_object.h
lustre/llite/Makefile.in
lustre/llite/llite_internal.h
lustre/llite/lproc_llite.c
lustre/llite/vvp_dev.c
lustre/llite/vvp_internal.h
lustre/llite/vvp_object.c
lustre/llite/vvp_req.c [deleted file]
lustre/lov/lov_cl_internal.h
lustre/lov/lov_dev.c
lustre/lov/lovsub_dev.c
lustre/lov/lovsub_object.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_page.c
lustre/obdecho/echo_client.c
lustre/osc/osc_cache.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_dev.c
lustre/osc/osc_io.c
lustre/osc/osc_object.c
lustre/osc/osc_request.c

index 6f5ab21..224e2b5 100644 (file)
@@ -197,8 +197,6 @@ The following types of layered objects exist in CLIO:
 
 - IO content: struct cl_io, slices are of type cl_io_slice;
 
-- Transfer request: struct cl_req, slices are of type cl_req_slice.
-
 1.5. Instantiation
 ==================
 
@@ -909,8 +907,7 @@ There are two possible modes of transfer initiation on the client:
   efficiently. Example: pages dirtied by the write(2) path. Pages submitted for
   an opportunistic transfer are kept in a "staging area".
 
-In any case, a transfer takes place in the form of a cl_req, which is a
-representation for a network RPC.
+In any case, a transfer takes place in the form of a network RPC.
 
 Pages queued for an opportunistic transfer are placed into a staging area
 (represented as a set of per-object and per-device queues at the OSC layer)
@@ -922,10 +919,6 @@ server, max-RPC-in-flight limitations, size of the staging area, etc. CLIO uses
 osc_extent to group pages for req-formation. osc_extent are further managed in
 a per-object red-black tree for efficient RPC formatting.
 
-For an immediate transfer the IO submits a cl_page_list which the req-formation
-engine slices into cl_req's, possibly adding cached pages to some of the
-resulting req's.
-
 Whenever a page from cl_page_list is added to a newly constructed req, its
 cl_page_operations::cpo_prep() layer methods are called. At that moment, the
 page state is atomically changed from cl_page_state::CPS_OWNED to
@@ -979,45 +972,14 @@ The association between a page and an immediate transfer queue is protected by
 cl_page::cl_mutex. This mutex is acquired when a cl_page is added in a
 cl_page_list and released when a page is removed from the list.
 
-When an RPC is formed, all of its constituent pages are linked together through
-cl_page::cp_flight list hanging off of cl_req::crq_pages. Pages are removed
-from this list just before the transfer completion method is invoked. No
-special lock protects this list, as pages in transfer are under a VM lock.
-
-7.3. Transfer States: Prepare, Completion
-=========================================
-
-The transfer (cl_req) state machine is trivial, and it is not explicitly coded.
-A newly created transfer is in the "prepare" state while pages are collected.
-When all pages are gathered, the transfer enters the "in-flight" state where it
-remains until it reaches the "completion" state where page completion handlers
-are invoked.
-
-The per-layer ->cro_prep() transfer method is called when transfer preparation
-is completed and transfer is about to enter the in-flight state. Similarly, the
-per-layer ->cro_completion() method is called when the transfer completes
-before per-page completion methods are called.
-
-Additionally, before moving a transfer out of the prepare state, the RPC engine
-calls the cl_req_attr_set() function.  This function invokes ->cro_attr_set()
-methods on every layer to fill in RPC header that server uses to determine
-where to get or put data. This replaces the old ->ap_{update,fill}_obdo()
-methods.
-
-Further, cl_req's are not reference counted and access to them is not
-synchronized. This is because they are accessed only by the RPC engine in OSC
-which fully controls RPC life-time, and it uses an internal OSC lock
-(client_obd::cl_loi_list_lock spin-lock) for serialization.
-
-7.4. Page Completion Handlers, Synchronous Transfer
+7.3. Page Completion Handlers, Synchronous Transfer
 ===================================================
 
-When a transfer completes, cl_req completion methods are called on every layer.
-Then, for every transfer page, per-layer page completion methods
-->cpo_completion() are invoked. The page is still under the VM lock at this
-moment.  Completion methods are called bottom-to-top and it is responsibility
-of the last of them (i.e., the completion method of the top-most layer---VVP)
-to release the VM lock.
+When a transfer completes, for every transfer page, per-layer page completion
+methods ->cpo_completion() are invoked. The page is still under the VM lock at
+this moment.  Completion methods are called bottom-to-top and it is
+responsibility of the last of them (i.e., the completion method of the top-most
+layer---VVP) to release the VM lock.
 
 Both immediate and opportunistic transfers are asynchronous in the sense that
 control can return to the caller before the transfer completes. CLIO doesn't
index 1465c6a..a9543a1 100644 (file)
  *                 read/write system call it is associated with the single user
  *                 thread, that issued the system call).
  *
- *   - cl_req      represents a collection of pages for a transfer. cl_req is
- *                 constructed by req-forming engine that tries to saturate
- *                 transport with large and continuous transfers.
- *
  * Terminology
  *
  *     - to avoid confusion high-level I/O operation like read or write system
@@ -109,11 +105,8 @@ struct obd_info;
 struct inode;
 
 struct cl_device;
-struct cl_device_operations;
 
 struct cl_object;
-struct cl_object_page_operations;
-struct cl_object_lock_operations;
 
 struct cl_page;
 struct cl_page_slice;
@@ -126,27 +119,7 @@ struct cl_page_operations;
 struct cl_io;
 struct cl_io_slice;
 
-struct cl_req;
-struct cl_req_slice;
-
-/**
- * Operations for each data device in the client stack.
- *
- * \see vvp_cl_ops, lov_cl_ops, lovsub_cl_ops, osc_cl_ops
- */
-struct cl_device_operations {
-       /**
-        * Initialize cl_req. This method is called top-to-bottom on all
-        * devices in the stack to get them a chance to allocate layer-private
-        * data, and to attach them to the cl_req by calling
-        * cl_req_slice_add().
-        *
-        * \see osc_req_init(), lov_req_init(), lovsub_req_init()
-        * \see vvp_req_init()
-        */
-       int (*cdo_req_init)(const struct lu_env *env, struct cl_device *dev,
-                           struct cl_req *req);
-};
+struct cl_req_attr;
 
 /**
  * Device in the client stack.
@@ -156,8 +129,6 @@ struct cl_device_operations {
 struct cl_device {
         /** Super-class. */
         struct lu_device                   cd_lu_dev;
-        /** Per-layer operation vector. */
-        const struct cl_device_operations *cd_ops;
 };
 
 /** \addtogroup cl_object cl_object
@@ -439,6 +410,12 @@ struct cl_object_operations {
         * Get maximum size of the object.
         */
        loff_t (*coo_maxbytes)(struct cl_object *obj);
+       /**
+        * Set request attributes.
+        */
+       void (*coo_req_attr_set)(const struct lu_env *env,
+                                struct cl_object *obj,
+                                struct cl_req_attr *attr);
 };
 
 /**
@@ -630,7 +607,7 @@ enum cl_page_state {
          *
          *     - [cl_page_state::CPS_PAGEOUT] page is dirty, the
          *     req-formation engine decides that it wants to include this page
-         *     into an cl_req being constructed, and yanks it from the cache;
+         *     into an RPC being constructed, and yanks it from the cache;
          *
          *     - [cl_page_state::CPS_FREEING] VM callback is executed to
          *     evict the page form the memory;
@@ -699,7 +676,7 @@ enum cl_page_state {
          * Page is being read in, as a part of a transfer. This is quite
          * similar to the cl_page_state::CPS_PAGEOUT state, except that
          * read-in is always "immediate"---there is no such thing a sudden
-         * construction of read cl_req from cached, presumably not up to date,
+         * construction of read request from cached, presumably not up to date,
          * pages.
          *
          * Underlying VM page is locked for the duration of transfer.
@@ -753,8 +730,6 @@ struct cl_page {
        struct list_head         cp_batch;
        /** List of slices. Immutable after creation. */
        struct list_head         cp_layers;
-       /** Linkage of pages within cl_req. */
-       struct list_head         cp_flight;
        /**
         * Page state. This field is const to avoid accidental update, it is
         * modified only internally within cl_page.c. Protected by a VM lock.
@@ -771,12 +746,6 @@ struct cl_page {
          * by sub-io. Protected by a VM lock.
          */
         struct cl_io            *cp_owner;
-        /**
-         * Owning IO request in cl_page_state::CPS_PAGEOUT and
-         * cl_page_state::CPS_PAGEIN states. This field is maintained only in
-         * the top-level pages. Protected by a VM lock.
-         */
-        struct cl_req           *cp_req;
         /** List of references to this page, for debugging. */
         struct lu_ref            cp_reference;
        /** Link to an object, for debugging. */
@@ -819,7 +788,6 @@ enum cl_lock_mode {
 
 /**
  * Requested transfer type.
- * \ingroup cl_req
  */
 enum cl_req_type {
         CRT_READ,
@@ -935,8 +903,7 @@ struct cl_page_operations {
         /**
          * \name transfer
          *
-         * Transfer methods. See comment on cl_req for a description of
-         * transfer formation and life-cycle.
+         * Transfer methods.
          *
          * @{
          */
@@ -982,7 +949,7 @@ struct cl_page_operations {
                                        int ioret);
                 /**
                  * Called when cached page is about to be added to the
-                 * cl_req as a part of req formation.
+                 * ptlrpc request as a part of req formation.
                  *
                  * \return    0       : proceed with this page;
                  * \return    -EAGAIN : skip this page;
@@ -1877,178 +1844,19 @@ struct cl_io {
 
 /** @} cl_io */
 
-/** \addtogroup cl_req cl_req
- * @{ */
-/** \struct cl_req
- * Transfer.
- *
- * There are two possible modes of transfer initiation on the client:
- *
- *     - immediate transfer: this is started when a high level io wants a page
- *       or a collection of pages to be transferred right away. Examples:
- *       read-ahead, synchronous read in the case of non-page aligned write,
- *       page write-out as a part of extent lock cancellation, page write-out
- *       as a part of memory cleansing. Immediate transfer can be both
- *       cl_req_type::CRT_READ and cl_req_type::CRT_WRITE;
- *
- *     - opportunistic transfer (cl_req_type::CRT_WRITE only), that happens
- *       when io wants to transfer a page to the server some time later, when
- *       it can be done efficiently. Example: pages dirtied by the write(2)
- *       path.
- *
- * In any case, transfer takes place in the form of a cl_req, which is a
- * representation for a network RPC.
- *
- * Pages queued for an opportunistic transfer are cached until it is decided
- * that efficient RPC can be composed of them. This decision is made by "a
- * req-formation engine", currently implemented as a part of osc
- * layer. Req-formation depends on many factors: the size of the resulting
- * RPC, whether or not multi-object RPCs are supported by the server,
- * max-rpc-in-flight limitations, size of the dirty cache, etc.
- *
- * For the immediate transfer io submits a cl_page_list, that req-formation
- * engine slices into cl_req's, possibly adding cached pages to some of
- * the resulting req's.
- *
- * Whenever a page from cl_page_list is added to a newly constructed req, its
- * cl_page_operations::cpo_prep() layer methods are called. At that moment,
- * page state is atomically changed from cl_page_state::CPS_OWNED to
- * cl_page_state::CPS_PAGEOUT or cl_page_state::CPS_PAGEIN, cl_page::cp_owner
- * is zeroed, and cl_page::cp_req is set to the
- * req. cl_page_operations::cpo_prep() method at the particular layer might
- * return -EALREADY to indicate that it does not need to submit this page
- * at all. This is possible, for example, if page, submitted for read,
- * became up-to-date in the meantime; and for write, the page don't have
- * dirty bit marked. \see cl_io_submit_rw()
- *
- * Whenever a cached page is added to a newly constructed req, its
- * cl_page_operations::cpo_make_ready() layer methods are called. At that
- * moment, page state is atomically changed from cl_page_state::CPS_CACHED to
- * cl_page_state::CPS_PAGEOUT, and cl_page::cp_req is set to
- * req. cl_page_operations::cpo_make_ready() method at the particular layer
- * might return -EAGAIN to indicate that this page is not eligible for the
- * transfer right now.
- *
- * FUTURE
- *
- * Plan is to divide transfers into "priority bands" (indicated when
- * submitting cl_page_list, and queuing a page for the opportunistic transfer)
- * and allow glueing of cached pages to immediate transfers only within single
- * band. This would make high priority transfers (like lock cancellation or
- * memory pressure induced write-out) really high priority.
- *
- */
-
 /**
  * Per-transfer attributes.
  */
 struct cl_req_attr {
+       enum cl_req_type cra_type;
+       u64              cra_flags;
+       struct cl_page  *cra_page;
        /** Generic attributes for the server consumption. */
        struct obdo     *cra_oa;
        /** Jobid */
        char             cra_jobid[LUSTRE_JOBID_SIZE];
 };
 
-/**
- * Transfer request operations definable at every layer.
- *
- * Concurrency: transfer formation engine synchronizes calls to all transfer
- * methods.
- */
-struct cl_req_operations {
-        /**
-         * Invoked top-to-bottom by cl_req_prep() when transfer formation is
-         * complete (all pages are added).
-         *
-         * \see osc_req_prep()
-         */
-        int  (*cro_prep)(const struct lu_env *env,
-                         const struct cl_req_slice *slice);
-        /**
-         * Called top-to-bottom to fill in \a oa fields. This is called twice
-         * with different flags, see bug 10150 and osc_build_req().
-         *
-         * \param obj an object from cl_req which attributes are to be set in
-         *            \a oa.
-         *
-         * \param oa struct obdo where attributes are placed
-         *
-         * \param flags \a oa fields to be filled.
-         */
-       void (*cro_attr_set)(const struct lu_env *env,
-                            const struct cl_req_slice *slice,
-                            const struct cl_object *obj,
-                            struct cl_req_attr *attr, u64 flags);
-        /**
-         * Called top-to-bottom from cl_req_completion() to notify layers that
-         * transfer completed. Has to free all state allocated by
-         * cl_device_operations::cdo_req_init().
-         */
-        void (*cro_completion)(const struct lu_env *env,
-                               const struct cl_req_slice *slice, int ioret);
-};
-
-/**
- * A per-object state that (potentially multi-object) transfer request keeps.
- */
-struct cl_req_obj {
-       /** object itself */
-       struct cl_object   *ro_obj;
-       /** reference to cl_req_obj::ro_obj. For debugging. */
-       struct lu_ref_link  ro_obj_ref;
-       /* something else? Number of pages for a given object? */
-};
-
-/**
- * Transfer request.
- *
- * Transfer requests are not reference counted, because IO sub-system owns
- * them exclusively and knows when to free them.
- *
- * Life cycle.
- *
- * cl_req is created by cl_req_alloc() that calls
- * cl_device_operations::cdo_req_init() device methods to allocate per-req
- * state in every layer.
- *
- * Then pages are added (cl_req_page_add()), req keeps track of all objects it
- * contains pages for.
- *
- * Once all pages were collected, cl_page_operations::cpo_prep() method is
- * called top-to-bottom. At that point layers can modify req, let it pass, or
- * deny it completely. This is to support things like SNS that have transfer
- * ordering requirements invisible to the individual req-formation engine.
- *
- * On transfer completion (or transfer timeout, or failure to initiate the
- * transfer of an allocated req), cl_req_operations::cro_completion() method
- * is called, after execution of cl_page_operations::cpo_completion() of all
- * req's pages.
- */
-struct cl_req {
-       enum cl_req_type        crq_type;
-       /** A list of pages being transferred */
-       struct list_head        crq_pages;
-       /** Number of pages in cl_req::crq_pages */
-       unsigned                crq_nrpages;
-       /** An array of objects which pages are in ->crq_pages */
-       struct cl_req_obj       *crq_o;
-       /** Number of elements in cl_req::crq_objs[] */
-       unsigned                crq_nrobjs;
-       struct list_head        crq_layers;
-};
-
-/**
- * Per-layer state for request.
- */
-struct cl_req_slice {
-       struct cl_req                   *crs_req;
-       struct cl_device                *crs_dev;
-       struct list_head                 crs_linkage;
-       const struct cl_req_operations  *crs_ops;
-};
-
-/* @} cl_req */
-
 enum cache_stats_item {
        /** how many cache lookups were performed */
        CS_lookup = 0,
@@ -2181,9 +1989,6 @@ void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
                        const struct cl_lock_operations *ops);
 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
                      struct cl_object *obj, const struct cl_io_operations *ops);
-void cl_req_slice_add(struct cl_req *req, struct cl_req_slice *slice,
-                      struct cl_device *dev,
-                      const struct cl_req_operations *ops);
 /** @} helpers */
 
 /** \defgroup cl_object cl_object
@@ -2569,18 +2374,8 @@ void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page);
 
 /** @} cl_page_list */
 
-/** \defgroup cl_req cl_req
- * @{ */
-struct cl_req *cl_req_alloc(const struct lu_env *env, struct cl_page *page,
-                            enum cl_req_type crt, int nr_objects);
-
-void cl_req_page_add  (const struct lu_env *env, struct cl_req *req,
-                       struct cl_page *page);
-void cl_req_page_done (const struct lu_env *env, struct cl_page *page);
-int  cl_req_prep      (const struct lu_env *env, struct cl_req *req);
-void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
-                    struct cl_req_attr *attr, u64 flags);
-void cl_req_completion(const struct lu_env *env, struct cl_req *req, int ioret);
+void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
+                    struct cl_req_attr *attr);
 
 /** \defgroup cl_sync_io cl_sync_io
  * @{ */
@@ -2615,8 +2410,6 @@ void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor);
 
 /** @} cl_sync_io */
 
-/** @} cl_req */
-
 /** \defgroup cl_env cl_env
  *
  * lu_env handling for a client.
index 35e11e5..f5247a2 100644 (file)
@@ -7,7 +7,7 @@ lustre-objs += rw26.o super25.o statahead.o xattr_security.o
 lustre-objs += glimpse.o
 lustre-objs += lcommon_cl.o
 lustre-objs += lcommon_misc.o
-lustre-objs += vvp_dev.o vvp_page.o vvp_lock.o vvp_io.o vvp_object.o vvp_req.o
+lustre-objs += vvp_dev.o vvp_page.o vvp_lock.o vvp_io.o vvp_object.o
 lustre-objs += range_lock.o
 
 llite_lloop-objs := lloop.o
index a2979ea..9054f77 100644 (file)
@@ -734,8 +734,6 @@ enum {
        LPROC_LL_WRITE_BYTES,
        LPROC_LL_BRW_READ,
        LPROC_LL_BRW_WRITE,
-       LPROC_LL_OSC_READ,
-       LPROC_LL_OSC_WRITE,
        LPROC_LL_IOCTL,
        LPROC_LL_OPEN,
        LPROC_LL_RELEASE,
@@ -1353,15 +1351,6 @@ struct ll_dio_pages {
         int           ldp_nr;
 };
 
-static inline void cl_stats_tally(struct cl_device *dev, enum cl_req_type crt,
-                                  int rc)
-{
-        int opc = (crt == CRT_READ) ? LPROC_LL_OSC_READ :
-                                      LPROC_LL_OSC_WRITE;
-
-       ll_stats_ops_tally(ll_s2sbi(cl2vvp_dev(dev)->vdv_sb), opc, rc);
-}
-
 extern ssize_t ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io,
                                   int rw, struct inode *inode,
                                   struct ll_dio_pages *pv);
index 7e84cfe..b8e16b6 100644 (file)
@@ -1049,10 +1049,6 @@ static const struct llite_file_opcode {
                                    "brw_read" },
         { LPROC_LL_BRW_WRITE,      LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES,
                                    "brw_write" },
-        { LPROC_LL_OSC_READ,       LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
-                                   "osc_read" },
-        { LPROC_LL_OSC_WRITE,      LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
-                                   "osc_write" },
         { LPROC_LL_IOCTL,          LPROCFS_TYPE_REGS, "ioctl" },
         { LPROC_LL_OPEN,           LPROCFS_TYPE_REGS, "open" },
         { LPROC_LL_RELEASE,        LPROCFS_TYPE_REGS, "close" },
index 288b642..752261f 100644 (file)
@@ -59,7 +59,6 @@
 static struct kmem_cache *ll_thread_kmem;
 struct kmem_cache *vvp_lock_kmem;
 struct kmem_cache *vvp_object_kmem;
-struct kmem_cache *vvp_req_kmem;
 static struct kmem_cache *vvp_session_kmem;
 static struct kmem_cache *vvp_thread_kmem;
 
@@ -79,11 +78,6 @@ static struct lu_kmem_descr vvp_caches[] = {
                .ckd_name  = "vvp_object_kmem",
                .ckd_size  = sizeof(struct vvp_object),
        },
-       {
-               .ckd_cache = &vvp_req_kmem,
-               .ckd_name  = "vvp_req_kmem",
-               .ckd_size  = sizeof(struct vvp_req),
-       },
         {
                 .ckd_cache = &vvp_session_kmem,
                 .ckd_name  = "vvp_session_kmem",
@@ -180,10 +174,6 @@ static const struct lu_device_operations vvp_lu_ops = {
         .ldo_object_alloc      = vvp_object_alloc
 };
 
-static const struct cl_device_operations vvp_cl_ops = {
-       .cdo_req_init = vvp_req_init,
-};
-
 static struct lu_device *vvp_device_free(const struct lu_env *env,
                                         struct lu_device *d)
 {
@@ -218,7 +208,6 @@ static struct lu_device *vvp_device_alloc(const struct lu_env *env,
        lud = &vdv->vdv_cl.cd_lu_dev;
        cl_device_init(&vdv->vdv_cl, t);
        vvp2lu_dev(vdv)->ld_ops = &vvp_lu_ops;
-       vdv->vdv_cl.cd_ops = &vvp_cl_ops;
 
        OBD_ALLOC_PTR(site);
        if (site != NULL) {
@@ -336,7 +325,6 @@ int cl_sb_init(struct super_block *sb)
                 cl = cl_type_setup(env, NULL, &vvp_device_type,
                                    sbi->ll_dt_exp->exp_obd->obd_lu_dev);
                 if (!IS_ERR(cl)) {
-                       cl2vvp_dev(cl)->vdv_sb = sb;
                         sbi->ll_cl = cl;
                         sbi->ll_site = cl2lu_dev(cl)->ld_site;
                 }
index 31aff08..6854932 100644 (file)
@@ -147,7 +147,6 @@ extern struct lu_context_key vvp_thread_key;
 
 extern struct kmem_cache *vvp_lock_kmem;
 extern struct kmem_cache *vvp_object_kmem;
-extern struct kmem_cache *vvp_req_kmem;
 
 struct vvp_thread_info {
        struct cl_lock          vti_lock;
@@ -269,7 +268,6 @@ static inline pgoff_t vvp_index(struct vvp_page *vpg)
 
 struct vvp_device {
        struct cl_device    vdv_cl;
-       struct super_block *vdv_sb;
        struct cl_device   *vdv_next;
 };
 
@@ -277,10 +275,6 @@ struct vvp_lock {
        struct cl_lock_slice vlk_cl;
 };
 
-struct vvp_req {
-       struct cl_req_slice vrq_cl;
-};
-
 static inline struct lu_device *vvp2lu_dev(struct vvp_device *vdv)
 {
        return &vdv->vdv_cl.cd_lu_dev;
@@ -347,8 +341,6 @@ int vvp_lock_init(const struct lu_env *env, struct cl_object *obj,
                  struct cl_lock *lock, const struct cl_io *io);
 int vvp_page_init(const struct lu_env *env, struct cl_object *obj,
                  struct cl_page *page, pgoff_t index);
-int vvp_req_init(const struct lu_env *env, struct cl_device *dev,
-                struct cl_req *req);
 struct lu_object *vvp_object_alloc(const struct lu_env *env,
                                   const struct lu_object_header *hdr,
                                   struct lu_device *dev);
index e7e4480..30dd2c7 100644 (file)
@@ -194,6 +194,26 @@ static int vvp_object_glimpse(const struct lu_env *env,
        RETURN(0);
 }
 
+static void vvp_req_attr_set(const struct lu_env *env, struct cl_object *obj,
+                            struct cl_req_attr *attr)
+{
+       struct inode *inode;
+       struct obdo  *oa;
+       u64 valid_flags = OBD_MD_FLTYPE;
+
+       oa = attr->cra_oa;
+       inode = vvp_object_inode(obj);
+
+       if (attr->cra_type == CRT_WRITE)
+               valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                              OBD_MD_FLUID | OBD_MD_FLGID;
+       obdo_from_inode(oa, inode, valid_flags & attr->cra_flags);
+       obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_INVALID_PFID))
+               oa->o_parent_oid++;
+       memcpy(attr->cra_jobid, ll_i2info(inode)->lli_jobid, LUSTRE_JOBID_SIZE);
+}
+
 static const struct cl_object_operations vvp_ops = {
        .coo_page_init    = vvp_page_init,
        .coo_lock_init    = vvp_lock_init,
@@ -202,7 +222,8 @@ static const struct cl_object_operations vvp_ops = {
        .coo_attr_update  = vvp_attr_update,
        .coo_conf_set     = vvp_conf_set,
        .coo_prune        = vvp_prune,
-       .coo_glimpse      = vvp_object_glimpse
+       .coo_glimpse      = vvp_object_glimpse,
+       .coo_req_attr_set = vvp_req_attr_set
 };
 
 static int vvp_object_init0(const struct lu_env *env,
diff --git a/lustre/llite/vvp_req.c b/lustre/llite/vvp_req.c
deleted file mode 100644 (file)
index d070d9d..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.gnu.org/licenses/gpl-2.0.html
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, 2014, Intel Corporation.
- */
-
-#define DEBUG_SUBSYSTEM S_LLITE
-
-#include <lustre/lustre_idl.h>
-#include <cl_object.h>
-#include <obd.h>
-#include <obd_support.h>
-#include "llite_internal.h"
-#include "vvp_internal.h"
-
-static inline struct vvp_req *cl2vvp_req(const struct cl_req_slice *slice)
-{
-       return container_of0(slice, struct vvp_req, vrq_cl);
-}
-
-/**
- * Implementation of struct cl_req_operations::cro_attr_set() for VVP
- * layer. VVP is responsible for
- *
- *    - o_[mac]time
- *
- *    - o_mode
- *
- *    - o_parent_seq
- *
- *    - o_[ug]id
- *
- *    - o_parent_oid
- *
- *    - o_parent_ver
- */
-static void vvp_req_attr_set(const struct lu_env *env,
-                            const struct cl_req_slice *slice,
-                            const struct cl_object *obj,
-                            struct cl_req_attr *attr, u64 flags)
-{
-       struct inode    *inode;
-       struct obdo     *oa;
-       u64              valid_flags = OBD_MD_FLTYPE;
-
-       oa = attr->cra_oa;
-       inode = vvp_object_inode(obj);
-
-       if (slice->crs_req->crq_type == CRT_WRITE) {
-               valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                              OBD_MD_FLUID | OBD_MD_FLGID;
-       }
-
-       obdo_from_inode(oa, inode, valid_flags & flags);
-       obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_INVALID_PFID))
-               oa->o_parent_oid++;
-
-       memcpy(attr->cra_jobid, ll_i2info(inode)->lli_jobid,
-              LUSTRE_JOBID_SIZE);
-}
-
-static void vvp_req_completion(const struct lu_env *env,
-                              const struct cl_req_slice *slice, int ioret)
-{
-       struct vvp_req *vrq;
-
-       if (ioret > 0)
-               cl_stats_tally(slice->crs_dev, slice->crs_req->crq_type, ioret);
-
-       vrq = cl2vvp_req(slice);
-       OBD_SLAB_FREE_PTR(vrq, vvp_req_kmem);
-}
-
-static const struct cl_req_operations vvp_req_ops = {
-       .cro_attr_set   = vvp_req_attr_set,
-       .cro_completion = vvp_req_completion,
-};
-
-int vvp_req_init(const struct lu_env *env, struct cl_device *dev,
-                struct cl_req *req)
-{
-       struct vvp_req *vrq;
-       int result;
-
-       OBD_SLAB_ALLOC_PTR_GFP(vrq, vvp_req_kmem, GFP_NOFS);
-       if (vrq != NULL) {
-               cl_req_slice_add(req, &vrq->vrq_cl, dev, &vvp_req_ops);
-               result = 0;
-       } else {
-               result = -ENOMEM;
-       }
-
-       return result;
-}
index 77ce075..133a183 100644 (file)
@@ -475,20 +475,6 @@ struct lov_session {
         struct lov_sublock_env ls_subenv;
 };
 
-/**
- * State of transfer for lov.
- */
-struct lov_req {
-        struct cl_req_slice lr_cl;
-};
-
-/**
- * State of transfer for lovsub.
- */
-struct lovsub_req {
-        struct cl_req_slice lsrq_cl;
-};
-
 extern struct lu_device_type lov_device_type;
 extern struct lu_device_type lovsub_device_type;
 
@@ -499,11 +485,9 @@ extern struct kmem_cache *lov_lock_kmem;
 extern struct kmem_cache *lov_object_kmem;
 extern struct kmem_cache *lov_thread_kmem;
 extern struct kmem_cache *lov_session_kmem;
-extern struct kmem_cache *lov_req_kmem;
 
 extern struct kmem_cache *lovsub_lock_kmem;
 extern struct kmem_cache *lovsub_object_kmem;
-extern struct kmem_cache *lovsub_req_kmem;
 
 extern struct kmem_cache *lov_lock_link_kmem;
 
@@ -704,11 +688,6 @@ static inline struct lov_page *cl2lov_page(const struct cl_page_slice *slice)
         return container_of0(slice, struct lov_page, lps_cl);
 }
 
-static inline struct lov_req *cl2lov_req(const struct cl_req_slice *slice)
-{
-        return container_of0(slice, struct lov_req, lr_cl);
-}
-
 static inline struct lovsub_page *
 cl2lovsub_page(const struct cl_page_slice *slice)
 {
@@ -716,11 +695,6 @@ cl2lovsub_page(const struct cl_page_slice *slice)
         return container_of0(slice, struct lovsub_page, lsb_cl);
 }
 
-static inline struct lovsub_req *cl2lovsub_req(const struct cl_req_slice *slice)
-{
-        return container_of0(slice, struct lovsub_req, lsrq_cl);
-}
-
 static inline struct lov_io *cl2lov_io(const struct lu_env *env,
                                 const struct cl_io_slice *ios)
 {
index c682cac..b9007e4 100644 (file)
@@ -49,11 +49,9 @@ struct kmem_cache *lov_lock_kmem;
 struct kmem_cache *lov_object_kmem;
 struct kmem_cache *lov_thread_kmem;
 struct kmem_cache *lov_session_kmem;
-struct kmem_cache *lov_req_kmem;
 
 struct kmem_cache *lovsub_lock_kmem;
 struct kmem_cache *lovsub_object_kmem;
-struct kmem_cache *lovsub_req_kmem;
 
 struct kmem_cache *lov_lock_link_kmem;
 
@@ -82,11 +80,6 @@ struct lu_kmem_descr lov_caches[] = {
                 .ckd_size  = sizeof (struct lov_session)
         },
         {
-                .ckd_cache = &lov_req_kmem,
-                .ckd_name  = "lov_req_kmem",
-                .ckd_size  = sizeof (struct lov_req)
-        },
-        {
                 .ckd_cache = &lovsub_lock_kmem,
                 .ckd_name  = "lovsub_lock_kmem",
                 .ckd_size  = sizeof (struct lovsub_lock)
@@ -97,11 +90,6 @@ struct lu_kmem_descr lov_caches[] = {
                 .ckd_size  = sizeof (struct lovsub_object)
         },
         {
-                .ckd_cache = &lovsub_req_kmem,
-                .ckd_name  = "lovsub_req_kmem",
-                .ckd_size  = sizeof (struct lovsub_req)
-        },
-        {
                 .ckd_cache = &lov_lock_link_kmem,
                 .ckd_name  = "lov_lock_link_kmem",
                 .ckd_size  = sizeof (struct lov_lock_link)
@@ -113,27 +101,6 @@ struct lu_kmem_descr lov_caches[] = {
 
 /*****************************************************************************
  *
- * Lov transfer operations.
- *
- */
-
-static void lov_req_completion(const struct lu_env *env,
-                               const struct cl_req_slice *slice, int ioret)
-{
-        struct lov_req *lr;
-
-        ENTRY;
-        lr = cl2lov_req(slice);
-        OBD_SLAB_FREE_PTR(lr, lov_req_kmem);
-        EXIT;
-}
-
-static const struct cl_req_operations lov_req_ops = {
-        .cro_completion = lov_req_completion
-};
-
-/*****************************************************************************
- *
  * Lov device and device type functions.
  *
  */
@@ -251,26 +218,6 @@ static int lov_device_init(const struct lu_env *env, struct lu_device *d,
         RETURN(rc);
 }
 
-static int lov_req_init(const struct lu_env *env, struct cl_device *dev,
-                       struct cl_req *req)
-{
-       struct lov_req *lr;
-       int result;
-
-       ENTRY;
-       OBD_SLAB_ALLOC_PTR_GFP(lr, lov_req_kmem, GFP_NOFS);
-       if (lr != NULL) {
-               cl_req_slice_add(req, &lr->lr_cl, dev, &lov_req_ops);
-               result = 0;
-       } else
-               result = -ENOMEM;
-       RETURN(result);
-}
-
-static const struct cl_device_operations lov_cl_ops = {
-        .cdo_req_init = lov_req_init
-};
-
 static void lov_emerg_free(struct lov_device_emerg **emrg, int nr)
 {
         int i;
@@ -487,7 +434,6 @@ static struct lu_device *lov_device_alloc(const struct lu_env *env,
         cl_device_init(&ld->ld_cl, t);
         d = lov2lu_dev(ld);
         d->ld_ops        = &lov_lu_ops;
-        ld->ld_cl.cd_ops = &lov_cl_ops;
 
        mutex_init(&ld->ld_mutex);
        lockdep_set_class(&ld->ld_mutex, &cl_lov_device_mutex_class);
index 1a42ca9..ec646d5 100644 (file)
 
 /*****************************************************************************
  *
- * Lovsub transfer operations.
- *
- */
-
-static void lovsub_req_completion(const struct lu_env *env,
-                                  const struct cl_req_slice *slice, int ioret)
-{
-        struct lovsub_req *lsr;
-
-        ENTRY;
-        lsr = cl2lovsub_req(slice);
-        OBD_SLAB_FREE_PTR(lsr, lovsub_req_kmem);
-        EXIT;
-}
-
-/**
- * Implementation of struct cl_req_operations::cro_attr_set() for lovsub
- * layer. Lov and lovsub are responsible only for struct obdo::o_stripe_idx
- * field, which is filled there.
- */
-static void lovsub_req_attr_set(const struct lu_env *env,
-                               const struct cl_req_slice *slice,
-                               const struct cl_object *obj,
-                               struct cl_req_attr *attr, u64 flags)
-{
-        struct lovsub_object *subobj;
-
-        ENTRY;
-        subobj = cl2lovsub(obj);
-        /*
-         * There is no OBD_MD_* flag for obdo::o_stripe_idx, so set it
-         * unconditionally. It never changes anyway.
-         */
-        attr->cra_oa->o_stripe_idx = subobj->lso_index;
-        EXIT;
-}
-
-static const struct cl_req_operations lovsub_req_ops = {
-        .cro_attr_set   = lovsub_req_attr_set,
-        .cro_completion = lovsub_req_completion
-};
-
-/*****************************************************************************
- *
  * Lov-sub device and device type functions.
  *
  */
@@ -148,31 +104,12 @@ static struct lu_device *lovsub_device_free(const struct lu_env *env,
        return next;
 }
 
-static int lovsub_req_init(const struct lu_env *env, struct cl_device *dev,
-                          struct cl_req *req)
-{
-       struct lovsub_req *lsr;
-       int result;
-
-       OBD_SLAB_ALLOC_PTR_GFP(lsr, lovsub_req_kmem, GFP_NOFS);
-       if (lsr != NULL) {
-               cl_req_slice_add(req, &lsr->lsrq_cl, dev, &lovsub_req_ops);
-               result = 0;
-       } else
-               result = -ENOMEM;
-       return result;
-}
-
 static const struct lu_device_operations lovsub_lu_ops = {
         .ldo_object_alloc      = lovsub_object_alloc,
         .ldo_process_config    = NULL,
         .ldo_recovery_complete = NULL
 };
 
-static const struct cl_device_operations lovsub_cl_ops = {
-        .cdo_req_init = lovsub_req_init
-};
-
 static struct lu_device *lovsub_device_alloc(const struct lu_env *env,
                                              struct lu_device_type *t,
                                              struct lustre_cfg *cfg)
@@ -188,7 +125,6 @@ static struct lu_device *lovsub_device_alloc(const struct lu_env *env,
                 if (result == 0) {
                         d = lovsub2lu_dev(lsd);
                         d->ld_ops         = &lovsub_lu_ops;
-                        lsd->acid_cl.cd_ops = &lovsub_cl_ops;
                 } else
                         d = ERR_PTR(result);
         } else
index e960cde..12aa04e 100644 (file)
@@ -125,13 +125,33 @@ static int lovsub_object_glimpse(const struct lu_env *env,
         RETURN(cl_object_glimpse(env, &los->lso_super->lo_cl, lvb));
 }
 
+/**
+ * Implementation of struct cl_object_operations::coo_req_attr_set() for lovsub
+ * layer. Lov and lovsub are responsible only for struct obdo::o_stripe_idx
+ * field, which is filled there.
+ */
+static void lovsub_req_attr_set(const struct lu_env *env, struct cl_object *obj,
+                               struct cl_req_attr *attr)
+{
+       struct lovsub_object *subobj = cl2lovsub(obj);
 
+       ENTRY;
+       cl_req_attr_set(env, &subobj->lso_super->lo_cl, attr);
+
+       /*
+        * There is no OBD_MD_* flag for obdo::o_stripe_idx, so set it
+        * unconditionally. It never changes anyway.
+        */
+       attr->cra_oa->o_stripe_idx = subobj->lso_index;
+       EXIT;
+}
 
 static const struct cl_object_operations lovsub_ops = {
        .coo_page_init    = lovsub_page_init,
        .coo_lock_init    = lovsub_lock_init,
        .coo_attr_update  = lovsub_attr_update,
-       .coo_glimpse      = lovsub_object_glimpse
+       .coo_glimpse      = lovsub_object_glimpse,
+       .coo_req_attr_set = lovsub_req_attr_set
 };
 
 static const struct lu_object_operations lovsub_lu_obj_ops = {
index 0a3e2b4..c033b77 100644 (file)
@@ -1165,251 +1165,21 @@ void cl_io_print(const struct lu_env *env, void *cookie,
 }
 
 /**
- * Adds request slice to the compound request.
- *
- * This is called by cl_device_operations::cdo_req_init() methods to add a
- * per-layer state to the request. New state is added at the end of
- * cl_req::crq_layers list, that is, it is at the bottom of the stack.
- *
- * \see cl_lock_slice_add(), cl_page_slice_add(), cl_io_slice_add()
- */
-void cl_req_slice_add(struct cl_req *req, struct cl_req_slice *slice,
-                      struct cl_device *dev,
-                      const struct cl_req_operations *ops)
-{
-        ENTRY;
-       list_add_tail(&slice->crs_linkage, &req->crq_layers);
-        slice->crs_dev = dev;
-        slice->crs_ops = ops;
-        slice->crs_req = req;
-        EXIT;
-}
-EXPORT_SYMBOL(cl_req_slice_add);
-
-static void cl_req_free(const struct lu_env *env, struct cl_req *req)
-{
-        unsigned i;
-
-       LASSERT(list_empty(&req->crq_pages));
-        LASSERT(req->crq_nrpages == 0);
-       LINVRNT(list_empty(&req->crq_layers));
-        LINVRNT(equi(req->crq_nrobjs > 0, req->crq_o != NULL));
-        ENTRY;
-
-        if (req->crq_o != NULL) {
-                for (i = 0; i < req->crq_nrobjs; ++i) {
-                        struct cl_object *obj = req->crq_o[i].ro_obj;
-                       if (obj != NULL) {
-                               lu_object_ref_del_at(&obj->co_lu,
-                                                    &req->crq_o[i].ro_obj_ref,
-                                                    "cl_req", req);
-                               cl_object_put(env, obj);
-                       }
-                }
-                OBD_FREE(req->crq_o, req->crq_nrobjs * sizeof req->crq_o[0]);
-        }
-        OBD_FREE_PTR(req);
-        EXIT;
-}
-
-static int cl_req_init(const struct lu_env *env, struct cl_req *req,
-                       struct cl_page *page)
-{
-       struct cl_device     *dev;
-       struct cl_page_slice *slice;
-       int result;
-
-       ENTRY;
-       result = 0;
-       list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
-               dev = lu2cl_dev(slice->cpl_obj->co_lu.lo_dev);
-               if (dev->cd_ops->cdo_req_init != NULL) {
-                       result = dev->cd_ops->cdo_req_init(env,
-                                       dev, req);
-                       if (result != 0)
-                               break;
-               }
-       }
-       RETURN(result);
-}
-
-/**
- * Invokes per-request transfer completion call-backs
- * (cl_req_operations::cro_completion()) bottom-to-top.
- */
-void cl_req_completion(const struct lu_env *env, struct cl_req *req, int rc)
-{
-        struct cl_req_slice *slice;
-
-        ENTRY;
-        /*
-         * for the lack of list_for_each_entry_reverse_safe()...
-         */
-       while (!list_empty(&req->crq_layers)) {
-               slice = list_entry(req->crq_layers.prev,
-                                  struct cl_req_slice, crs_linkage);
-               list_del_init(&slice->crs_linkage);
-                if (slice->crs_ops->cro_completion != NULL)
-                        slice->crs_ops->cro_completion(env, slice, rc);
-        }
-        cl_req_free(env, req);
-        EXIT;
-}
-EXPORT_SYMBOL(cl_req_completion);
-
-/**
- * Allocates new transfer request.
- */
-struct cl_req *cl_req_alloc(const struct lu_env *env, struct cl_page *page,
-                            enum cl_req_type crt, int nr_objects)
-{
-        struct cl_req *req;
-
-        LINVRNT(nr_objects > 0);
-        ENTRY;
-
-        OBD_ALLOC_PTR(req);
-        if (req != NULL) {
-                int result;
-
-               req->crq_type = crt;
-               INIT_LIST_HEAD(&req->crq_pages);
-               INIT_LIST_HEAD(&req->crq_layers);
-
-                OBD_ALLOC(req->crq_o, nr_objects * sizeof req->crq_o[0]);
-                if (req->crq_o != NULL) {
-                        req->crq_nrobjs = nr_objects;
-                        result = cl_req_init(env, req, page);
-                } else
-                        result = -ENOMEM;
-                if (result != 0) {
-                        cl_req_completion(env, req, result);
-                        req = ERR_PTR(result);
-                }
-        } else
-                req = ERR_PTR(-ENOMEM);
-        RETURN(req);
-}
-EXPORT_SYMBOL(cl_req_alloc);
-
-/**
- * Adds a page to a request.
- */
-void cl_req_page_add(const struct lu_env *env,
-                     struct cl_req *req, struct cl_page *page)
-{
-       struct cl_object  *obj;
-       struct cl_req_obj *rqo;
-       unsigned int i;
-
-       ENTRY;
-
-       LASSERT(list_empty(&page->cp_flight));
-        LASSERT(page->cp_req == NULL);
-
-        CL_PAGE_DEBUG(D_PAGE, env, page, "req %p, %d, %u\n",
-                      req, req->crq_type, req->crq_nrpages);
-
-       list_add_tail(&page->cp_flight, &req->crq_pages);
-        ++req->crq_nrpages;
-        page->cp_req = req;
-        obj = cl_object_top(page->cp_obj);
-        for (i = 0, rqo = req->crq_o; obj != rqo->ro_obj; ++i, ++rqo) {
-                if (rqo->ro_obj == NULL) {
-                        rqo->ro_obj = obj;
-                        cl_object_get(obj);
-                       lu_object_ref_add_at(&obj->co_lu, &rqo->ro_obj_ref,
-                                            "cl_req", req);
-                       break;
-               }
-       }
-       LASSERT(i < req->crq_nrobjs);
-       EXIT;
-}
-EXPORT_SYMBOL(cl_req_page_add);
-
-/**
- * Removes a page from a request.
- */
-void cl_req_page_done(const struct lu_env *env, struct cl_page *page)
-{
-       struct cl_req *req = page->cp_req;
-
-       ENTRY;
-
-       LASSERT(!list_empty(&page->cp_flight));
-       LASSERT(req->crq_nrpages > 0);
-
-       list_del_init(&page->cp_flight);
-       --req->crq_nrpages;
-       page->cp_req = NULL;
-       EXIT;
-}
-EXPORT_SYMBOL(cl_req_page_done);
-
-/**
- * Notifies layers that request is about to depart by calling
- * cl_req_operations::cro_prep() top-to-bottom.
- */
-int cl_req_prep(const struct lu_env *env, struct cl_req *req)
-{
-       unsigned int i;
-        int result;
-        const struct cl_req_slice *slice;
-
-        ENTRY;
-        /*
-         * Check that the caller of cl_req_alloc() didn't lie about the number
-         * of objects.
-         */
-        for (i = 0; i < req->crq_nrobjs; ++i)
-                LASSERT(req->crq_o[i].ro_obj != NULL);
-
-        result = 0;
-       list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
-                if (slice->crs_ops->cro_prep != NULL) {
-                        result = slice->crs_ops->cro_prep(env, slice);
-                        if (result != 0)
-                                break;
-                }
-        }
-        RETURN(result);
-}
-EXPORT_SYMBOL(cl_req_prep);
-
-/**
  * Fills in attributes that are passed to server together with transfer. Only
  * attributes from \a flags may be touched. This can be called multiple times
  * for the same request.
  */
-void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
-                    struct cl_req_attr *attr, u64 flags)
+void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
+                    struct cl_req_attr *attr)
 {
-        const struct cl_req_slice *slice;
-        struct cl_page            *page;
-       unsigned int i;
-
-       LASSERT(!list_empty(&req->crq_pages));
-        ENTRY;
+       struct cl_object *scan;
+       ENTRY;
 
-        /* Take any page to use as a model. */
-       page = list_entry(req->crq_pages.next, struct cl_page, cp_flight);
-
-        for (i = 0; i < req->crq_nrobjs; ++i) {
-               list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
-                        const struct cl_page_slice *scan;
-                        const struct cl_object     *obj;
-
-                        scan = cl_page_at(page,
-                                          slice->crs_dev->cd_lu_dev.ld_type);
-                        LASSERT(scan != NULL);
-                        obj = scan->cpl_obj;
-                        if (slice->crs_ops->cro_attr_set != NULL)
-                                slice->crs_ops->cro_attr_set(env, slice, obj,
-                                                             attr + i, flags);
-                }
-        }
-        EXIT;
+       cl_object_for_each(scan, obj) {
+               if (scan->co_ops->coo_req_attr_set != NULL)
+                       scan->co_ops->coo_req_attr_set(env, scan, attr);
+       }
+       EXIT;
 }
 EXPORT_SYMBOL(cl_req_attr_set);
 
index 03ad075..315fe4c 100644 (file)
@@ -137,7 +137,6 @@ static void cl_page_free(const struct lu_env *env, struct cl_page *page)
 
        PASSERT(env, page, list_empty(&page->cp_batch));
        PASSERT(env, page, page->cp_owner == NULL);
-       PASSERT(env, page, page->cp_req == NULL);
        PASSERT(env, page, page->cp_state == CPS_FREEING);
 
        ENTRY;
@@ -192,7 +191,6 @@ struct cl_page *cl_page_alloc(const struct lu_env *env,
                page->cp_type = type;
                INIT_LIST_HEAD(&page->cp_layers);
                INIT_LIST_HEAD(&page->cp_batch);
-               INIT_LIST_HEAD(&page->cp_flight);
                lu_ref_init(&page->cp_reference);
                head = o->co_lu.lo_header;
                list_for_each_entry(o, &head->loh_layers,
@@ -603,7 +601,6 @@ static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
                                         io, nonblock);
                 if (result == 0) {
                         PASSERT(env, pg, pg->cp_owner == NULL);
-                        PASSERT(env, pg, pg->cp_req == NULL);
                        pg->cp_owner = cl_io_top(io);;
                         cl_page_owner_set(pg);
                         if (pg->cp_state != CPS_FREEING) {
@@ -915,8 +912,6 @@ void cl_page_completion(const struct lu_env *env,
         struct cl_sync_io *anchor = pg->cp_sync_io;
 
         PASSERT(env, pg, crt < CRT_NR);
-        /* cl_page::cp_req already cleared by the caller (osc_completion()) */
-        PASSERT(env, pg, pg->cp_req == NULL);
         PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
 
         ENTRY;
@@ -927,20 +922,11 @@ void cl_page_completion(const struct lu_env *env,
         CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(io[crt].cpo_completion),
                                (const struct lu_env *,
                                 const struct cl_page_slice *, int), ioret);
-        if (anchor) {
-                LASSERT(pg->cp_sync_io == anchor);
-                pg->cp_sync_io = NULL;
-       }
-       /*
-        * As page->cp_obj is pinned by a reference from page->cp_req, it is
-        * safe to call cl_page_put() without risking object destruction in a
-        * non-blocking context.
-        */
-       cl_page_put(env, pg);
-
-       if (anchor != NULL)
+       if (anchor != NULL) {
+               LASSERT(pg->cp_sync_io == anchor);
+               pg->cp_sync_io = NULL;
                cl_sync_io_note(env, anchor, ioret);
-
+       }
        EXIT;
 }
 EXPORT_SYMBOL(cl_page_completion);
@@ -1026,10 +1012,10 @@ void cl_page_header_print(const struct lu_env *env, void *cookie,
                           lu_printer_t printer, const struct cl_page *pg)
 {
        (*printer)(env, cookie,
-                  "page@%p[%d %p %d %d %d %p %p]\n",
+                  "page@%p[%d %p %d %d %d %p]\n",
                   pg, atomic_read(&pg->cp_ref), pg->cp_obj,
                   pg->cp_state, pg->cp_error, pg->cp_type,
-                  pg->cp_owner, pg->cp_req);
+                  pg->cp_owner);
 }
 EXPORT_SYMBOL(cl_page_header_print);
 
index 75a3e48..303ce27 100644 (file)
@@ -584,9 +584,6 @@ static struct lu_device_operations echo_device_lu_ops = {
 
 /** @} echo_lu_dev_ops */
 
-static struct cl_device_operations echo_device_cl_ops = {
-};
-
 /** \defgroup echo_init Setup and teardown
  *
  * Init and fini functions for echo client.
@@ -843,7 +840,6 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                 GOTO(out, rc);
 
         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
-        cd->cd_ops = &echo_device_cl_ops;
 
         cleanup = 2;
         obd = class_name2obd(lustre_cfg_string(cfg, 0));
index dd28035..0b01b36 100644 (file)
@@ -1291,14 +1291,6 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
                "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
        LASSERT(opg->ops_transfer_pinned);
 
-       /*
-        * page->cp_req can be NULL if io submission failed before
-        * cl_req was allocated.
-        */
-       if (page->cp_req != NULL)
-               cl_req_page_done(env, page);
-       LASSERT(page->cp_req == NULL);
-
        crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
        /* Clear opg->ops_transfer_pinned before VM lock is released. */
        opg->ops_transfer_pinned = 0;
@@ -1326,6 +1318,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        lu_ref_del(&page->cp_reference, "transfer", page);
 
        cl_page_completion(env, page, crt, rc);
+       cl_page_put(env, page);
 
        RETURN(0);
 }
index 65658f6..e5caf37 100644 (file)
@@ -86,13 +86,6 @@ struct osc_io {
 };
 
 /**
- * State of transfer for osc.
- */
-struct osc_req {
-        struct cl_req_slice    or_cl;
-};
-
-/**
  * State maintained by osc layer for the duration of a system call.
  */
 struct osc_session {
@@ -115,6 +108,7 @@ struct osc_thread_info {
        pgoff_t                 oti_next_index;
        pgoff_t                 oti_fn_index; /* first non-overlapped index */
        struct cl_sync_io       oti_anchor;
+       struct cl_req_attr      oti_req_attr;
 };
 
 struct osc_object {
@@ -379,7 +373,6 @@ extern struct kmem_cache *osc_lock_kmem;
 extern struct kmem_cache *osc_object_kmem;
 extern struct kmem_cache *osc_thread_kmem;
 extern struct kmem_cache *osc_session_kmem;
-extern struct kmem_cache *osc_req_kmem;
 extern struct kmem_cache *osc_extent_kmem;
 
 extern struct lu_device_type osc_device_type;
@@ -393,8 +386,6 @@ int osc_lock_init(const struct lu_env *env,
                   const struct cl_io *io);
 int osc_io_init  (const struct lu_env *env,
                   struct cl_object *obj, struct cl_io *io);
-int osc_req_init (const struct lu_env *env, struct cl_device *dev,
-                  struct cl_req *req);
 struct lu_object *osc_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *dev);
@@ -548,6 +539,16 @@ static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
        return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
 }
 
+static inline struct osc_page *
+osc_cl_page_osc(struct cl_page *page, struct osc_object *osc)
+{
+       const struct cl_page_slice *slice;
+
+       LASSERT(osc != NULL);
+       slice = cl_object_page_slice(&osc->oo_cl, page);
+       return cl2osc_page(slice);
+}
+
 static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice)
 {
         LINVRNT(osc_is_object(&slice->cls_obj->co_lu));
index 6ba193b..22bd81f 100644 (file)
@@ -33,7 +33,7 @@
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
- * Implementation of cl_device, cl_req for OSC layer.
+ * Implementation of cl_device, for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  */
@@ -53,7 +53,6 @@ struct kmem_cache *osc_lock_kmem;
 struct kmem_cache *osc_object_kmem;
 struct kmem_cache *osc_thread_kmem;
 struct kmem_cache *osc_session_kmem;
-struct kmem_cache *osc_req_kmem;
 struct kmem_cache *osc_extent_kmem;
 struct kmem_cache *osc_quota_kmem;
 
@@ -79,11 +78,6 @@ struct lu_kmem_descr osc_caches[] = {
                 .ckd_size  = sizeof (struct osc_session)
         },
         {
-                .ckd_cache = &osc_req_kmem,
-                .ckd_name  = "osc_req_kmem",
-                .ckd_size  = sizeof (struct osc_req)
-        },
-        {
                .ckd_cache = &osc_extent_kmem,
                .ckd_name  = "osc_extent_kmem",
                .ckd_size  = sizeof (struct osc_extent)
@@ -181,10 +175,6 @@ static const struct lu_device_operations osc_lu_ops = {
         .ldo_recovery_complete = NULL
 };
 
-static const struct cl_device_operations osc_cl_ops = {
-        .cdo_req_init = osc_req_init
-};
-
 static int osc_device_init(const struct lu_env *env, struct lu_device *d,
                            const char *name, struct lu_device *next)
 {
@@ -223,7 +213,6 @@ static struct lu_device *osc_device_alloc(const struct lu_env *env,
         cl_device_init(&od->od_cl, t);
         d = osc2lu_dev(od);
         d->ld_ops = &osc_lu_ops;
-        od->od_cl.cd_ops = &osc_cl_ops;
 
         /* Setup OSC OBD */
         obd = class_name2obd(lustre_cfg_string(cfg, 0));
index 3837da2..2fb42ba 100644 (file)
  *
  */
 
-static struct osc_req *cl2osc_req(const struct cl_req_slice *slice)
-{
-        LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type);
-        return container_of0(slice, struct osc_req, or_cl);
-}
-
 static struct osc_io *cl2osc_io(const struct lu_env *env,
                                 const struct cl_io_slice *slice)
 {
@@ -67,21 +61,6 @@ static struct osc_io *cl2osc_io(const struct lu_env *env,
         return oio;
 }
 
-static struct osc_page *osc_cl_page_osc(struct cl_page *page,
-                                       struct osc_object *osc)
-{
-       const struct cl_page_slice *slice;
-
-       if (osc != NULL)
-               slice = cl_object_page_slice(&osc->oo_cl, page);
-       else
-               slice = cl_page_at(page, &osc_device_type);
-       LASSERT(slice != NULL);
-
-       return cl2osc_page(slice);
-}
-
-
 /*****************************************************************************
  *
  * io operations.
@@ -938,104 +917,6 @@ static const struct cl_io_operations osc_io_ops = {
  *
  */
 
-static int osc_req_prep(const struct lu_env *env,
-                        const struct cl_req_slice *slice)
-{
-        return 0;
-}
-
-static void osc_req_completion(const struct lu_env *env,
-                               const struct cl_req_slice *slice, int ioret)
-{
-        struct osc_req *or;
-
-        or = cl2osc_req(slice);
-        OBD_SLAB_FREE_PTR(or, osc_req_kmem);
-}
-
-/**
- * Implementation of struct cl_req_operations::cro_attr_set() for osc
- * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq
- * fields.
- */
-static void osc_req_attr_set(const struct lu_env *env,
-                            const struct cl_req_slice *slice,
-                            const struct cl_object *obj,
-                            struct cl_req_attr *attr, u64 flags)
-{
-       struct lov_oinfo *oinfo;
-       struct cl_req    *clerq;
-       struct cl_page   *apage; /* _some_ page in @clerq */
-       struct ldlm_lock *lock;  /* _some_ lock protecting @apage */
-       struct osc_page  *opg;
-       struct obdo      *oa;
-       struct ost_lvb   *lvb;
-
-       oinfo   = cl2osc(obj)->oo_oinfo;
-       lvb     = &oinfo->loi_lvb;
-       oa      = attr->cra_oa;
-
-       if ((flags & OBD_MD_FLMTIME) != 0) {
-               oa->o_mtime = lvb->lvb_mtime;
-               oa->o_valid |= OBD_MD_FLMTIME;
-       }
-       if ((flags & OBD_MD_FLATIME) != 0) {
-               oa->o_atime = lvb->lvb_atime;
-               oa->o_valid |= OBD_MD_FLATIME;
-       }
-       if ((flags & OBD_MD_FLCTIME) != 0) {
-               oa->o_ctime = lvb->lvb_ctime;
-               oa->o_valid |= OBD_MD_FLCTIME;
-       }
-       if (flags & OBD_MD_FLGROUP) {
-               ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi));
-               oa->o_valid |= OBD_MD_FLGROUP;
-       }
-       if (flags & OBD_MD_FLID) {
-               ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi));
-               oa->o_valid |= OBD_MD_FLID;
-       }
-       if (flags & OBD_MD_FLHANDLE) {
-               clerq = slice->crs_req;
-               LASSERT(!list_empty(&clerq->crq_pages));
-               apage = container_of(clerq->crq_pages.next,
-                                    struct cl_page, cp_flight);
-               opg = osc_cl_page_osc(apage, NULL);
-               lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
-                               OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING);
-               if (lock == NULL && !opg->ops_srvlock) {
-                       struct ldlm_resource *res;
-                       struct ldlm_res_id *resname;
-
-                       CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n");
-
-                       resname = &osc_env_info(env)->oti_resname;
-                       ostid_build_res_name(&oinfo->loi_oi, resname);
-                       res = ldlm_resource_get(
-                               osc_export(cl2osc(obj))->exp_obd->obd_namespace,
-                               NULL, resname, LDLM_EXTENT, 0);
-                       ldlm_resource_dump(D_ERROR, res);
-
-                       libcfs_debug_dumpstack(NULL);
-                       LBUG();
-               }
-
-               /* check for lockless io. */
-               if (lock != NULL) {
-                       oa->o_handle = lock->l_remote_handle;
-                       oa->o_valid |= OBD_MD_FLHANDLE;
-                       LDLM_LOCK_PUT(lock);
-               }
-       }
-}
-
-static const struct cl_req_operations osc_req_ops = {
-        .cro_prep       = osc_req_prep,
-        .cro_attr_set   = osc_req_attr_set,
-        .cro_completion = osc_req_completion
-};
-
-
 int osc_io_init(const struct lu_env *env,
                 struct cl_object *obj, struct cl_io *io)
 {
@@ -1046,19 +927,4 @@ int osc_io_init(const struct lu_env *env,
         return 0;
 }
 
-int osc_req_init(const struct lu_env *env, struct cl_device *dev,
-                struct cl_req *req)
-{
-       struct osc_req *or;
-       int result;
-
-       OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS);
-       if (or != NULL) {
-               cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
-               result = 0;
-       } else
-               result = -ENOMEM;
-       return result;
-}
-
 /** @} osc */
index 16bb564..7337489 100644 (file)
@@ -374,6 +374,77 @@ int osc_object_is_contended(struct osc_object *obj)
         return 1;
 }
 
+/**
+ * Implementation of struct cl_object_operations::coo_req_attr_set() for osc
+ * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq
+ * fields.
+ */
+static void osc_req_attr_set(const struct lu_env *env, struct cl_object *obj,
+                            struct cl_req_attr *attr)
+{
+       struct lov_oinfo *oinfo;
+       struct obdo      *oa;
+       struct ost_lvb   *lvb;
+       u64               flags = attr->cra_flags;
+
+       oinfo   = cl2osc(obj)->oo_oinfo;
+       lvb     = &oinfo->loi_lvb;
+       oa      = attr->cra_oa;
+
+       if ((flags & OBD_MD_FLMTIME) != 0) {
+               oa->o_mtime = lvb->lvb_mtime;
+               oa->o_valid |= OBD_MD_FLMTIME;
+       }
+       if ((flags & OBD_MD_FLATIME) != 0) {
+               oa->o_atime = lvb->lvb_atime;
+               oa->o_valid |= OBD_MD_FLATIME;
+       }
+       if ((flags & OBD_MD_FLCTIME) != 0) {
+               oa->o_ctime = lvb->lvb_ctime;
+               oa->o_valid |= OBD_MD_FLCTIME;
+       }
+       if (flags & OBD_MD_FLGROUP) {
+               ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi));
+               oa->o_valid |= OBD_MD_FLGROUP;
+       }
+       if (flags & OBD_MD_FLID) {
+               ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi));
+               oa->o_valid |= OBD_MD_FLID;
+       }
+       if (flags & OBD_MD_FLHANDLE) {
+               struct ldlm_lock *lock;
+               struct osc_page *opg;
+
+               opg = osc_cl_page_osc(attr->cra_page, cl2osc(obj));
+               lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
+                               OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING);
+               if (lock == NULL && !opg->ops_srvlock) {
+                       struct ldlm_resource *res;
+                       struct ldlm_res_id *resname;
+
+                       CL_PAGE_DEBUG(D_ERROR, env, attr->cra_page,
+                                     "uncovered page!\n");
+
+                       resname = &osc_env_info(env)->oti_resname;
+                       ostid_build_res_name(&oinfo->loi_oi, resname);
+                       res = ldlm_resource_get(
+                               osc_export(cl2osc(obj))->exp_obd->obd_namespace,
+                               NULL, resname, LDLM_EXTENT, 0);
+                       ldlm_resource_dump(D_ERROR, res);
+
+                       libcfs_debug_dumpstack(NULL);
+                       LBUG();
+               }
+
+               /* check for lockless io. */
+               if (lock != NULL) {
+                       oa->o_handle = lock->l_remote_handle;
+                       oa->o_valid |= OBD_MD_FLHANDLE;
+                       LDLM_LOCK_PUT(lock);
+               }
+       }
+}
+
 static const struct cl_object_operations osc_ops = {
        .coo_page_init    = osc_page_init,
        .coo_lock_init    = osc_lock_init,
@@ -384,6 +455,7 @@ static const struct cl_object_operations osc_ops = {
        .coo_prune        = osc_object_prune,
        .coo_find_cbdata  = osc_object_find_cbdata,
        .coo_fiemap       = osc_object_fiemap,
+       .coo_req_attr_set = osc_req_attr_set
 };
 
 static const struct lu_object_operations osc_lu_obj_ops = {
index 42d132d..28b23da 100644 (file)
@@ -72,7 +72,6 @@ struct osc_brw_async_args {
        struct client_obd        *aa_cli;
        struct list_head          aa_oaps;
        struct list_head          aa_exts;
-       struct cl_req            *aa_clerq;
 };
 
 #define osc_grant_args osc_brw_async_args
@@ -1590,8 +1589,6 @@ static int brw_interpret(const struct lu_env *env,
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
 
-       cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
-                         req->rq_bulk->bd_nob_transferred);
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
        ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
 
@@ -1642,10 +1639,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        struct osc_brw_async_args       *aa = NULL;
        struct obdo                     *oa = NULL;
        struct osc_async_page           *oap;
-       struct osc_async_page           *tmp;
-       struct cl_req                   *clerq = NULL;
-       enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
-                                                                     CRT_READ;
+       struct osc_object               *obj = NULL;
        struct cl_req_attr              *crattr = NULL;
        loff_t                          starting_offset = OBD_OBJECT_EOF;
        loff_t                          ending_offset = 0;
@@ -1653,6 +1647,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        int                             mem_tight = 0;
        int                             page_count = 0;
        bool                            soft_sync = false;
+       bool                            interrupted = false;
        int                             i;
        int                             rc;
        struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
@@ -1664,31 +1659,15 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        list_for_each_entry(ext, ext_list, oe_link) {
                LASSERT(ext->oe_state == OES_RPC);
                mem_tight |= ext->oe_memalloc;
-               list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
-                       ++page_count;
-                       list_add_tail(&oap->oap_rpc_item, &rpc_list);
-                       if (starting_offset == OBD_OBJECT_EOF ||
-                           starting_offset > oap->oap_obj_off)
-                               starting_offset = oap->oap_obj_off;
-                       else
-                               LASSERT(oap->oap_page_off == 0);
-                       if (ending_offset < oap->oap_obj_off + oap->oap_count)
-                               ending_offset = oap->oap_obj_off +
-                                               oap->oap_count;
-                       else
-                               LASSERT(oap->oap_page_off + oap->oap_count ==
-                                       PAGE_CACHE_SIZE);
-               }
+               page_count += ext->oe_nr_pages;
+               if (obj == NULL)
+                       obj = ext->oe_obj;
        }
 
        soft_sync = osc_over_unstable_soft_limit(cli);
        if (mem_tight)
                mpflag = cfs_memory_pressure_get_and_set();
 
-       OBD_ALLOC(crattr, sizeof(*crattr));
-       if (crattr == NULL)
-               GOTO(out, rc = -ENOMEM);
-
        OBD_ALLOC(pga, sizeof(*pga) * page_count);
        if (pga == NULL)
                GOTO(out, rc = -ENOMEM);
@@ -1698,37 +1677,43 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                GOTO(out, rc = -ENOMEM);
 
        i = 0;
-       list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
-               struct cl_page *page = oap2cl_page(oap);
-               if (clerq == NULL) {
-                       clerq = cl_req_alloc(env, page, crt,
-                                            1 /* only 1-object rpcs for now */);
-                       if (IS_ERR(clerq))
-                               GOTO(out, rc = PTR_ERR(clerq));
+       list_for_each_entry(ext, ext_list, oe_link) {
+               list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
+                       if (mem_tight)
+                               oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+                       if (soft_sync)
+                               oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
+                       pga[i] = &oap->oap_brw_page;
+                       pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
+                       i++;
+
+                       list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                       if (starting_offset == OBD_OBJECT_EOF ||
+                           starting_offset > oap->oap_obj_off)
+                               starting_offset = oap->oap_obj_off;
+                       else
+                               LASSERT(oap->oap_page_off == 0);
+                       if (ending_offset < oap->oap_obj_off + oap->oap_count)
+                               ending_offset = oap->oap_obj_off +
+                                               oap->oap_count;
+                       else
+                               LASSERT(oap->oap_page_off + oap->oap_count ==
+                                       PAGE_CACHE_SIZE);
+                       if (oap->oap_interrupted)
+                               interrupted = true;
                }
-               if (mem_tight)
-                       oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
-               if (soft_sync)
-                       oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
-               pga[i] = &oap->oap_brw_page;
-               pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
-               CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
-                      pga[i]->pg, page_index(oap->oap_page), oap,
-                      pga[i]->flag);
-               i++;
-               cl_req_page_add(env, clerq, page);
        }
 
-       /* always get the data for the obdo for the rpc */
-       LASSERT(clerq != NULL);
-       crattr->cra_oa = oa;
-       cl_req_attr_set(env, clerq, crattr, ~0ULL);
+       /* first page in the list */
+       oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
 
-       rc = cl_req_prep(env, clerq);
-       if (rc != 0) {
-               CERROR("cl_req_prep failed: %d\n", rc);
-               GOTO(out, rc);
-       }
+       crattr = &osc_env_info(env)->oti_req_attr;
+       memset(crattr, 0, sizeof(*crattr));
+       crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
+       crattr->cra_flags = ~0ULL;
+       crattr->cra_page = oap2cl_page(oap);
+       crattr->cra_oa = oa;
+       cl_req_attr_set(env, osc2cl(obj), crattr);
 
        sort_brw_pages(pga, page_count);
        rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
@@ -1739,9 +1724,10 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
 
        req->rq_commit_cb = brw_commit;
        req->rq_interpret_reply = brw_interpret;
-
-       if (mem_tight != 0)
-               req->rq_memalloc = 1;
+       req->rq_memalloc = mem_tight != 0;
+       oap->oap_request = ptlrpc_request_addref(req);
+       if (interrupted && !req->rq_intr)
+               ptlrpc_mark_interrupted(req);
 
        /* Need to update the timestamps after the request is built in case
         * we race with setattr (locally or in queue at OST).  If OST gets
@@ -1750,9 +1736,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
         * way to do this in a single call.  bug 10150 */
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        crattr->cra_oa = &body->oa;
-       cl_req_attr_set(env, clerq, crattr,
-                       OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
-
+       crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
+       cl_req_attr_set(env, osc2cl(obj), crattr);
        lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
 
        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
@@ -1761,23 +1746,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        list_splice_init(&rpc_list, &aa->aa_oaps);
        INIT_LIST_HEAD(&aa->aa_exts);
        list_splice_init(ext_list, &aa->aa_exts);
-       aa->aa_clerq = clerq;
-
-       /* queued sync pages can be torn down while the pages
-        * were between the pending list and the rpc */
-       tmp = NULL;
-       list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
-               /* only one oap gets a request reference */
-               if (tmp == NULL)
-                       tmp = oap;
-               if (oap->oap_interrupted && !req->rq_intr) {
-                       CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
-                                       oap, req);
-                       ptlrpc_mark_interrupted(req);
-               }
-       }
-       if (tmp != NULL)
-               tmp->oap_request = ptlrpc_request_addref(req);
 
        spin_lock(&cli->cl_loi_list_lock);
        starting_offset >>= PAGE_CACHE_SHIFT;
@@ -1808,9 +1776,6 @@ out:
        if (mem_tight != 0)
                cfs_memory_pressure_restore(mpflag);
 
-       if (crattr != NULL)
-               OBD_FREE(crattr, sizeof(*crattr));
-
        if (rc != 0) {
                LASSERT(req == NULL);
 
@@ -1826,8 +1791,6 @@ out:
                        list_del_init(&ext->oe_link);
                        osc_extent_finish(env, ext, 0, rc);
                }
-               if (clerq && !IS_ERR(clerq))
-                       cl_req_completion(env, clerq, rc);
        }
        RETURN(rc);
 }