Whamcloud - gitweb
LU-5835 ptlrpc: Introduce iovec to bulk descriptor 25/12525/22
authorAmir Shehata <amir.shehata@intel.com>
Fri, 31 Oct 2014 17:08:14 +0000 (10:08 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 11 Jun 2015 18:33:05 +0000 (18:33 +0000)
Added a union to ptlrpc_bulk_desc for KVEC and KIOV buffers.
bd_type changed to be a bit mask.
Bits are set in bd_type to specify {put,get}{source,sink}{kvec,kiov}
changed all instances in the code to access the union properly
ASSUMPTION: all current code only works with KIOV and new DNE code
to be added will introduce a different code path that uses IOVEC

As part of the implementation buffer operations are added
as callbacks to be defined by users of ptlrpc.  This enables
buffer users to define how to allocate and release buffers.

Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Change-Id: I86d718a668e7c0b422b96b393a6a69e5b8e89291
Reviewed-on: http://review.whamcloud.com/12525
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Liang Zhen <liang.zhen@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
18 files changed:
lustre/include/lustre_net.h
lustre/mdc/mdc_request.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_nids.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c
lustre/osp/osp_object.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/gss/gss_bulk.c
lustre/ptlrpc/gss/gss_krb5_mech.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pers.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/sec_bulk.c
lustre/ptlrpc/sec_plain.c
lustre/quota/qsd_request.c
lustre/target/tgt_handler.c

index d3caeb2..fae4c7e 100644 (file)
@@ -55,6 +55,7 @@
  * @{
  */
 
+#include <linux/uio.h>
 #include <libcfs/libcfs.h>
 #include <lnet/nidstr.h>
 #include <lnet/api.h>
@@ -1317,12 +1318,97 @@ struct ptlrpc_bulk_page {
        struct page     *bp_page;
 };
 
-#define BULK_GET_SOURCE   0
-#define BULK_PUT_SINK     1
-#define BULK_GET_SINK     2
-#define BULK_PUT_SOURCE   3
+enum ptlrpc_bulk_op_type {
+       PTLRPC_BULK_OP_ACTIVE =  0x00000001,
+       PTLRPC_BULK_OP_PASSIVE = 0x00000002,
+       PTLRPC_BULK_OP_PUT =     0x00000004,
+       PTLRPC_BULK_OP_GET =     0x00000008,
+       PTLRPC_BULK_BUF_KVEC =   0x00000010,
+       PTLRPC_BULK_BUF_KIOV =   0x00000020,
+       PTLRPC_BULK_GET_SOURCE = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_GET,
+       PTLRPC_BULK_PUT_SINK =   PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_PUT,
+       PTLRPC_BULK_GET_SINK =   PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_GET,
+       PTLRPC_BULK_PUT_SOURCE = PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_PUT,
+};
 
-/**
+static inline bool ptlrpc_is_bulk_op_get(enum ptlrpc_bulk_op_type type)
+{
+       return (type & PTLRPC_BULK_OP_GET) == PTLRPC_BULK_OP_GET;
+}
+
+static inline bool ptlrpc_is_bulk_get_source(enum ptlrpc_bulk_op_type type)
+{
+       return (type & PTLRPC_BULK_GET_SOURCE) == PTLRPC_BULK_GET_SOURCE;
+}
+
+static inline bool ptlrpc_is_bulk_put_sink(enum ptlrpc_bulk_op_type type)
+{
+       return (type & PTLRPC_BULK_PUT_SINK) == PTLRPC_BULK_PUT_SINK;
+}
+
+static inline bool ptlrpc_is_bulk_get_sink(enum ptlrpc_bulk_op_type type)
+{
+       return (type & PTLRPC_BULK_GET_SINK) == PTLRPC_BULK_GET_SINK;
+}
+
+static inline bool ptlrpc_is_bulk_put_source(enum ptlrpc_bulk_op_type type)
+{
+       return (type & PTLRPC_BULK_PUT_SOURCE) == PTLRPC_BULK_PUT_SOURCE;
+}
+
+static inline bool ptlrpc_is_bulk_desc_kvec(enum ptlrpc_bulk_op_type type)
+{
+       return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
+                       == PTLRPC_BULK_BUF_KVEC;
+}
+
+static inline bool ptlrpc_is_bulk_desc_kiov(enum ptlrpc_bulk_op_type type)
+{
+       return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
+                       == PTLRPC_BULK_BUF_KIOV;
+}
+
+static inline bool ptlrpc_is_bulk_op_active(enum ptlrpc_bulk_op_type type)
+{
+       return ((type & PTLRPC_BULK_OP_ACTIVE) |
+               (type & PTLRPC_BULK_OP_PASSIVE))
+                       == PTLRPC_BULK_OP_ACTIVE;
+}
+
+static inline bool ptlrpc_is_bulk_op_passive(enum ptlrpc_bulk_op_type type)
+{
+       return ((type & PTLRPC_BULK_OP_ACTIVE) |
+               (type & PTLRPC_BULK_OP_PASSIVE))
+                       == PTLRPC_BULK_OP_PASSIVE;
+}
+
+struct ptlrpc_bulk_frag_ops {
+       /**
+        * Add a page \a page to the bulk descriptor \a desc
+        * Data to transfer in the page starts at offset \a pageoffset and
+        * amount of data to transfer from the page is \a len
+        */
+       void (*add_kiov_frag)(struct ptlrpc_bulk_desc *desc,
+                             struct page *page, int pageoffset, int len);
+
+       /*
+        * Add a \a fragment to the bulk descriptor \a desc.
+        * Data to transfer in the fragment is pointed to by \a frag
+        * The size of the fragment is \a len
+        */
+       int (*add_iov_frag)(struct ptlrpc_bulk_desc *desc, void *frag, int len);
+
+       /**
+        * Uninitialize and free bulk descriptor \a desc.
+        * Works on bulk descriptors both from server and client side.
+        */
+       void (*release_frags)(struct ptlrpc_bulk_desc *desc);
+};
+
+extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops;
+extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
+
+/*
  * Definition of bulk descriptor.
  * Bulks are special "Two phase" RPCs where initial request message
  * is sent first and it is followed bt a transfer (o receiving) of a large
@@ -1335,14 +1421,14 @@ struct ptlrpc_bulk_page {
 struct ptlrpc_bulk_desc {
        /** completed with failure */
        unsigned long bd_failure:1;
-       /** {put,get}{source,sink} */
-       unsigned long bd_type:2;
        /** client side */
        unsigned long bd_registered:1;
        /** For serialization with callback */
        spinlock_t bd_lock;
        /** Import generation when request for this bulk was sent */
        int bd_import_generation;
+       /** {put,get}{source,sink}{kvec,kiov} */
+       enum ptlrpc_bulk_op_type bd_type;
        /** LNet portal for this bulk */
        __u32 bd_portal;
        /** Server side - export this bulk created for */
@@ -1351,6 +1437,7 @@ struct ptlrpc_bulk_desc {
        struct obd_import *bd_import;
        /** Back pointer to the request */
        struct ptlrpc_request *bd_req;
+       struct ptlrpc_bulk_frag_ops *bd_frag_ops;
        wait_queue_head_t      bd_waitq;        /* server side only WQ */
        int                    bd_iov_count;    /* # entries in bd_iov */
        int                    bd_max_iov;      /* allocated size of bd_iov */
@@ -1366,14 +1453,32 @@ struct ptlrpc_bulk_desc {
        /** array of associated MDs */
        lnet_handle_md_t        bd_mds[PTLRPC_BULK_OPS_COUNT];
 
-       /*
-        * encrypt iov, size is either 0 or bd_iov_count.
-        */
-       lnet_kiov_t           *bd_enc_iov;
+       union {
+               struct {
+                       /*
+                        * encrypt iov, size is either 0 or bd_iov_count.
+                        */
+                       lnet_kiov_t *bd_enc_vec;
+                       lnet_kiov_t bd_vec[0];
+               } bd_kiov;
+
+               struct {
+                       struct kvec *bd_enc_kvec;
+                       struct kvec bd_kvec[0];
+               } bd_kvec;
+       } bd_u;
 
-       lnet_kiov_t            bd_iov[0];
 };
 
+#define GET_KIOV(desc)                 ((desc)->bd_u.bd_kiov.bd_vec)
+#define BD_GET_KIOV(desc, i)           ((desc)->bd_u.bd_kiov.bd_vec[i])
+#define GET_ENC_KIOV(desc)             ((desc)->bd_u.bd_kiov.bd_enc_vec)
+#define BD_GET_ENC_KIOV(desc, i)       ((desc)->bd_u.bd_kiov.bd_enc_vec[i])
+#define GET_KVEC(desc)                 ((desc)->bd_u.bd_kvec.bd_kvec)
+#define BD_GET_KVEC(desc, i)           ((desc)->bd_u.bd_kvec.bd_kvec[i])
+#define GET_ENC_KVEC(desc)             ((desc)->bd_u.bd_kvec.bd_enc_kvec)
+#define BD_GET_ENC_KVEC(desc, i)       ((desc)->bd_u.bd_kvec.bd_enc_kvec[i])
+
 enum {
         SVC_STOPPED     = 1 << 0,
         SVC_STOPPING    = 1 << 1,
@@ -1897,8 +2002,11 @@ extern lnet_pid_t ptl_get_pid(void);
  */
 #ifdef HAVE_SERVER_SUPPORT
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
-                                             unsigned npages, unsigned max_brw,
-                                             unsigned type, unsigned portal);
+                                             unsigned nfrags, unsigned max_brw,
+                                             unsigned int type,
+                                             unsigned portal,
+                                             const struct ptlrpc_bulk_frag_ops
+                                               *ops);
 int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
 void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
 
@@ -2014,19 +2122,17 @@ void ptlrpc_req_finished(struct ptlrpc_request *request);
 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
-                                             unsigned npages, unsigned max_brw,
-                                             unsigned type, unsigned portal);
-void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin);
-static inline void ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc *bulk)
-{
-       __ptlrpc_free_bulk(bulk, 1);
-}
-static inline void ptlrpc_free_bulk_nopin(struct ptlrpc_bulk_desc *bulk)
-{
-       __ptlrpc_free_bulk(bulk, 0);
-}
+                                             unsigned nfrags, unsigned max_brw,
+                                             unsigned int type,
+                                             unsigned portal,
+                                             const struct ptlrpc_bulk_frag_ops
+                                               *ops);
+
+int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
+                         void *frag, int len);
 void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
-                            struct page *page, int pageoffset, int len, int);
+                            struct page *page, int pageoffset, int len,
+                            int pin);
 static inline void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
                                             struct page *page, int pageoffset,
                                             int len)
@@ -2041,6 +2147,20 @@ static inline void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
        __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
 }
 
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
+
+static inline void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
+{
+       int i;
+
+       for (i = 0; i < desc->bd_iov_count ; i++)
+               page_cache_release(BD_GET_KIOV(desc, i).kiov_page);
+}
+
+static inline void ptlrpc_release_bulk_noop(struct ptlrpc_bulk_desc *desc)
+{
+}
+
 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                                       struct obd_import *imp);
 __u64 ptlrpc_next_xid(void);
index 64bf184..9fbc9c0 100644 (file)
@@ -967,8 +967,10 @@ restart_bulk:
        req->rq_request_portal = MDS_READPAGE_PORTAL;
        ptlrpc_at_set_req_timeout(req);
 
-       desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
-                                   MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL) {
                ptlrpc_request_free(req);
                RETURN(-ENOMEM);
@@ -976,7 +978,8 @@ restart_bulk:
 
        /* NB req now owns desc and will free it when it gets freed */
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_CACHE_SIZE);
 
        mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE * npages, fid, oc);
 
index ed7a16a..01be9e2 100644 (file)
@@ -1617,13 +1617,16 @@ again:
         body->mcb_units  = nrpages;
 
        /* allocate bulk transfer descriptor */
-       desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK,
-                                   MGS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, nrpages, 1,
+                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   MGS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL)
                GOTO(out, rc = -ENOMEM);
 
        for (i = 0; i < nrpages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_CACHE_SIZE);
 
         ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
index 8aacd89..e945585 100644 (file)
@@ -658,18 +658,22 @@ int mgs_get_ir_logs(struct ptlrpc_request *req)
        page_count = (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
        LASSERT(page_count <= nrpages);
        desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
-                                   BULK_PUT_SOURCE, MGS_BULK_PORTAL);
+                                   PTLRPC_BULK_PUT_SOURCE |
+                                       PTLRPC_BULK_BUF_KIOV,
+                                   MGS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL)
                GOTO(out, rc = -ENOMEM);
 
        for (i = 0; i < page_count && bytes > 0; i++) {
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0,
-                                         min_t(int, bytes, PAGE_CACHE_SIZE));
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                min_t(int, bytes,
+                                                     PAGE_CACHE_SIZE));
                bytes -= PAGE_CACHE_SIZE;
         }
 
         rc = target_bulk_io(req->rq_export, desc, &lwi);
-       ptlrpc_free_bulk_pin(desc);
+       ptlrpc_free_bulk(desc);
 
 out:
        for (i = 0; i < nrpages; i++) {
index d187032..f07f462 100644 (file)
@@ -914,8 +914,10 @@ static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
        int count = 0;
        int i;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
        for (i = 0; i < page_count; i++) {
-               void *pz = page_zone(desc->bd_iov[i].kiov_page);
+               void *pz = page_zone(BD_GET_KIOV(desc, i).kiov_page);
 
                if (likely(pz == zone)) {
                        ++count;
index f03dfaa..a7b5dc5 100644 (file)
@@ -1017,8 +1017,11 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
 
        desc = ptlrpc_prep_bulk_imp(req, page_count,
                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
-               opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
-               OST_BULK_PORTAL);
+               (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
+                       PTLRPC_BULK_PUT_SINK) |
+                       PTLRPC_BULK_BUF_KIOV,
+               OST_BULK_PORTAL,
+               &ptlrpc_bulk_kiov_pin_ops);
 
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
@@ -1064,7 +1067,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
 
-               ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
+               desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
                 requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
index ff6107e..0c96f03 100644 (file)
@@ -1741,15 +1741,18 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
 
        ptlrpc_at_set_req_timeout(req);
 
-       desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
-                                   MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL) {
                ptlrpc_request_free(req);
                RETURN(-ENOMEM);
        }
 
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_CACHE_SIZE);
 
        ptlrpc_request_set_replen(req);
        rc = ptlrpc_queue_wait(req);
index 091cbcb..8f075e8 100644 (file)
 
 #include "ptlrpc_internal.h"
 
+const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
+       .add_kiov_frag  = ptlrpc_prep_bulk_page_pin,
+       .release_frags  = ptlrpc_release_bulk_page_pin,
+};
+EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
+
+const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
+       .add_kiov_frag  = ptlrpc_prep_bulk_page_nopin,
+       .release_frags  = ptlrpc_release_bulk_noop,
+};
+EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
+
 static int ptlrpc_send_new_req(struct ptlrpc_request *req);
 static int ptlrpcd_check_work(struct ptlrpc_request *req);
 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
@@ -97,23 +109,41 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
  * Allocate and initialize new bulk descriptor on the sender.
  * Returns pointer to the descriptor or NULL on error.
  */
-struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
-                                        unsigned type, unsigned portal)
+struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
+                                        enum ptlrpc_bulk_op_type type,
+                                        unsigned portal,
+                                        const struct ptlrpc_bulk_frag_ops *ops)
 {
        struct ptlrpc_bulk_desc *desc;
        int i;
 
-       OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]));
+       /* ensure that only one of KIOV or IOVEC is set but not both */
+       LASSERT((ptlrpc_is_bulk_desc_kiov(type) &&
+                ops->add_kiov_frag != NULL) ||
+               (ptlrpc_is_bulk_desc_kvec(type) &&
+                ops->add_iov_frag != NULL));
+
+       if (type & PTLRPC_BULK_BUF_KIOV) {
+               OBD_ALLOC(desc,
+                         offsetof(struct ptlrpc_bulk_desc,
+                                  bd_u.bd_kiov.bd_vec[nfrags]));
+       } else {
+               OBD_ALLOC(desc,
+                         offsetof(struct ptlrpc_bulk_desc,
+                                  bd_u.bd_kvec.bd_kvec[nfrags]));
+       }
+
        if (!desc)
                return NULL;
 
        spin_lock_init(&desc->bd_lock);
        init_waitqueue_head(&desc->bd_waitq);
-       desc->bd_max_iov = npages;
+       desc->bd_max_iov = nfrags;
        desc->bd_iov_count = 0;
        desc->bd_portal = portal;
        desc->bd_type = type;
        desc->bd_md_count = 0;
+       desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *) ops;
        LASSERT(max_brw > 0);
        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
        /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
@@ -126,21 +156,25 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
 
 /**
  * Prepare bulk descriptor for specified outgoing request \a req that
- * can fit \a npages * pages. \a type is bulk type. \a portal is where
+ * can fit \a nfrags * pages. \a type is bulk type. \a portal is where
  * the bulk to be sent. Used on client-side.
  * Returns pointer to newly allocatrd initialized bulk descriptor or NULL on
  * error.
  */
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
-                                             unsigned npages, unsigned max_brw,
-                                             unsigned type, unsigned portal)
+                                             unsigned nfrags, unsigned max_brw,
+                                             unsigned int type,
+                                             unsigned portal,
+                                             const struct ptlrpc_bulk_frag_ops
+                                               *ops)
 {
        struct obd_import *imp = req->rq_import;
        struct ptlrpc_bulk_desc *desc;
 
        ENTRY;
-       LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
-       desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+       LASSERT(ptlrpc_is_bulk_op_passive(type));
+
+       desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops);
        if (desc == NULL)
                RETURN(NULL);
 
@@ -158,60 +192,89 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
 }
 EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
 
-/*
- * Add a page \a page to the bulk descriptor \a desc.
- * Data to transfer in the page starts at offset \a pageoffset and
- * amount of data to transfer from the page is \a len
- */
 void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
-                            struct page *page, int pageoffset, int len, int pin)
+                            struct page *page, int pageoffset, int len,
+                            int pin)
 {
+       lnet_kiov_t *kiov;
+
        LASSERT(desc->bd_iov_count < desc->bd_max_iov);
        LASSERT(page != NULL);
        LASSERT(pageoffset >= 0);
        LASSERT(len > 0);
        LASSERT(pageoffset + len <= PAGE_CACHE_SIZE);
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
+       kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
 
        desc->bd_nob += len;
 
        if (pin)
                page_cache_get(page);
 
-       ptlrpc_add_bulk_page(desc, page, pageoffset, len);
+       kiov->kiov_page = page;
+       kiov->kiov_offset = pageoffset;
+       kiov->kiov_len = len;
+
+       desc->bd_iov_count++;
 }
 EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
 
-/**
- * Uninitialize and free bulk descriptor \a desc.
- * Works on bulk descriptors both from server and client side.
- */
-void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
+int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
+                         void *frag, int len)
+{
+       struct kvec *iovec;
+
+       LASSERT(desc->bd_iov_count < desc->bd_max_iov);
+       LASSERT(frag != NULL);
+       LASSERT(len > 0);
+       LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
+
+       iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
+
+       desc->bd_nob += len;
+
+       iovec->iov_base = frag;
+       iovec->iov_len = len;
+
+       desc->bd_iov_count++;
+
+       return desc->bd_nob;
+}
+EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
+
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 {
-       int i;
        ENTRY;
 
        LASSERT(desc != NULL);
        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
        LASSERT(desc->bd_md_count == 0);         /* network hands off */
        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
+       LASSERT(desc->bd_frag_ops != NULL);
 
-       sptlrpc_enc_pool_put_pages(desc);
+       if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
+               sptlrpc_enc_pool_put_pages(desc);
 
        if (desc->bd_export)
                class_export_put(desc->bd_export);
        else
                class_import_put(desc->bd_import);
 
-       if (unpin) {
-               for (i = 0; i < desc->bd_iov_count ; i++)
-                       page_cache_release(desc->bd_iov[i].kiov_page);
-       }
+       if (desc->bd_frag_ops->release_frags != NULL)
+               desc->bd_frag_ops->release_frags(desc);
+
+       if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
+               OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+                                       bd_u.bd_kiov.bd_vec[desc->bd_max_iov]));
+       else
+               OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+                                       bd_u.bd_kvec.bd_kvec[desc->
+                                               bd_max_iov]));
 
-       OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
-                               bd_iov[desc->bd_max_iov]));
        EXIT;
 }
-EXPORT_SYMBOL(__ptlrpc_free_bulk);
+EXPORT_SYMBOL(ptlrpc_free_bulk);
 
 /**
  * Set server timelimit for this req, i.e. how long are we willing to wait
@@ -2295,7 +2358,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                 request->rq_import = NULL;
         }
        if (request->rq_bulk != NULL)
-               ptlrpc_free_bulk_pin(request->rq_bulk);
+               ptlrpc_free_bulk(request->rq_bulk);
 
         if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL)
                 sptlrpc_cli_free_reqbuf(request);
index 27e36ef..fae0e57 100644 (file)
@@ -183,12 +183,12 @@ void client_bulk_callback (lnet_event_t *ev)
         struct ptlrpc_request   *req;
         ENTRY;
 
-        LASSERT ((desc->bd_type == BULK_PUT_SINK &&
-                  ev->type == LNET_EVENT_PUT) ||
-                 (desc->bd_type == BULK_GET_SOURCE &&
-                  ev->type == LNET_EVENT_GET) ||
-                 ev->type == LNET_EVENT_UNLINK);
-        LASSERT (ev->unlinked);
+       LASSERT((ptlrpc_is_bulk_put_sink(desc->bd_type) &&
+                ev->type == LNET_EVENT_PUT) ||
+               (ptlrpc_is_bulk_get_source(desc->bd_type) &&
+                ev->type == LNET_EVENT_GET) ||
+               ev->type == LNET_EVENT_UNLINK);
+       LASSERT(ev->unlinked);
 
         if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
                 ev->status = -EIO;
@@ -437,9 +437,9 @@ void server_bulk_callback (lnet_event_t *ev)
 
        LASSERT(ev->type == LNET_EVENT_SEND ||
                ev->type == LNET_EVENT_UNLINK ||
-               (desc->bd_type == BULK_PUT_SOURCE &&
+               (ptlrpc_is_bulk_put_source(desc->bd_type) &&
                 ev->type == LNET_EVENT_ACK) ||
-               (desc->bd_type == BULK_GET_SINK &&
+               (ptlrpc_is_bulk_get_sink(desc->bd_type) &&
                 ev->type == LNET_EVENT_REPLY));
 
         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
index 692735d..d855774 100644 (file)
@@ -63,104 +63,106 @@ int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
                           struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
-        struct gss_cli_ctx              *gctx;
-        struct lustre_msg               *msg;
-        struct ptlrpc_bulk_sec_desc     *bsd;
-        rawobj_t                         token;
-        __u32                            maj;
-        int                              offset;
-        int                              rc;
-        ENTRY;
-
-        LASSERT(req->rq_pack_bulk);
-        LASSERT(req->rq_bulk_read || req->rq_bulk_write);
-
-        gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
-        LASSERT(gctx->gc_mechctx);
-
-        switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
-        case SPTLRPC_SVC_NULL:
-                LASSERT(req->rq_reqbuf->lm_bufcount >= 3);
-                msg = req->rq_reqbuf;
-                offset = msg->lm_bufcount - 1;
-                break;
-        case SPTLRPC_SVC_AUTH:
-        case SPTLRPC_SVC_INTG:
-                LASSERT(req->rq_reqbuf->lm_bufcount >= 4);
-                msg = req->rq_reqbuf;
-                offset = msg->lm_bufcount - 2;
-                break;
-        case SPTLRPC_SVC_PRIV:
-                LASSERT(req->rq_clrbuf->lm_bufcount >= 2);
-                msg = req->rq_clrbuf;
-                offset = msg->lm_bufcount - 1;
-                break;
-        default:
-                LBUG();
-        }
-
-        bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
-        bsd->bsd_version = 0;
-        bsd->bsd_flags = 0;
-        bsd->bsd_type = SPTLRPC_BULK_DEFAULT;
-        bsd->bsd_svc = SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc);
-
-        if (bsd->bsd_svc == SPTLRPC_BULK_SVC_NULL)
-                RETURN(0);
-
-        LASSERT(bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG ||
-                bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV);
-
-        if (req->rq_bulk_read) {
-                /*
-                 * bulk read: prepare receiving pages only for privacy mode.
-                 */
-                if (bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV)
-                        return gss_cli_prep_bulk(req, desc);
-        } else {
-                /*
-                 * bulk write: sign or encrypt bulk pages.
-                 */
-                bsd->bsd_nob = desc->bd_nob;
-
-                if (bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
-                        /* integrity mode */
-                        token.data = bsd->bsd_data;
-                        token.len = lustre_msg_buflen(msg, offset) -
-                                    sizeof(*bsd);
-
-                        maj = lgss_get_mic(gctx->gc_mechctx, 0, NULL,
-                                           desc->bd_iov_count, desc->bd_iov,
-                                           &token);
-                        if (maj != GSS_S_COMPLETE) {
-                                CWARN("failed to sign bulk data: %x\n", maj);
-                                RETURN(-EACCES);
-                        }
-                } else {
-                        /* privacy mode */
-                        if (desc->bd_iov_count == 0)
-                                RETURN(0);
-
-                        rc = sptlrpc_enc_pool_get_pages(desc);
-                        if (rc) {
-                                CERROR("bulk write: failed to allocate "
-                                       "encryption pages: %d\n", rc);
-                                RETURN(rc);
-                        }
-
-                        token.data = bsd->bsd_data;
-                        token.len = lustre_msg_buflen(msg, offset) -
-                                    sizeof(*bsd);
-
-                        maj = lgss_wrap_bulk(gctx->gc_mechctx, desc, &token, 0);
-                        if (maj != GSS_S_COMPLETE) {
-                                CWARN("fail to encrypt bulk data: %x\n", maj);
-                                RETURN(-EACCES);
-                        }
-                }
-        }
-
-        RETURN(0);
+       struct gss_cli_ctx              *gctx;
+       struct lustre_msg               *msg;
+       struct ptlrpc_bulk_sec_desc     *bsd;
+       rawobj_t                         token;
+       __u32                            maj;
+       int                              offset;
+       int                              rc;
+       ENTRY;
+
+       LASSERT(req->rq_pack_bulk);
+       LASSERT(req->rq_bulk_read || req->rq_bulk_write);
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
+       gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
+       LASSERT(gctx->gc_mechctx);
+
+       switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
+       case SPTLRPC_SVC_NULL:
+               LASSERT(req->rq_reqbuf->lm_bufcount >= 3);
+               msg = req->rq_reqbuf;
+               offset = msg->lm_bufcount - 1;
+               break;
+       case SPTLRPC_SVC_AUTH:
+       case SPTLRPC_SVC_INTG:
+               LASSERT(req->rq_reqbuf->lm_bufcount >= 4);
+               msg = req->rq_reqbuf;
+               offset = msg->lm_bufcount - 2;
+               break;
+       case SPTLRPC_SVC_PRIV:
+               LASSERT(req->rq_clrbuf->lm_bufcount >= 2);
+               msg = req->rq_clrbuf;
+               offset = msg->lm_bufcount - 1;
+               break;
+       default:
+               LBUG();
+       }
+
+       bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
+       bsd->bsd_version = 0;
+       bsd->bsd_flags = 0;
+       bsd->bsd_type = SPTLRPC_BULK_DEFAULT;
+       bsd->bsd_svc = SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc);
+
+       if (bsd->bsd_svc == SPTLRPC_BULK_SVC_NULL)
+               RETURN(0);
+
+       LASSERT(bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG ||
+               bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV);
+
+       if (req->rq_bulk_read) {
+               /*
+                * bulk read: prepare receiving pages only for privacy mode.
+                */
+               if (bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV)
+                       return gss_cli_prep_bulk(req, desc);
+       } else {
+               /*
+                * bulk write: sign or encrypt bulk pages.
+                */
+               bsd->bsd_nob = desc->bd_nob;
+
+               if (bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
+                       /* integrity mode */
+                       token.data = bsd->bsd_data;
+                       token.len = lustre_msg_buflen(msg, offset) -
+                                   sizeof(*bsd);
+
+                       maj = lgss_get_mic(gctx->gc_mechctx, 0, NULL,
+                                          desc->bd_iov_count,
+                                          GET_KIOV(desc),
+                                          &token);
+                       if (maj != GSS_S_COMPLETE) {
+                               CWARN("failed to sign bulk data: %x\n", maj);
+                               RETURN(-EACCES);
+                       }
+               } else {
+                       /* privacy mode */
+                       if (desc->bd_iov_count == 0)
+                               RETURN(0);
+
+                       rc = sptlrpc_enc_pool_get_pages(desc);
+                       if (rc) {
+                               CERROR("bulk write: failed to allocate "
+                                      "encryption pages: %d\n", rc);
+                               RETURN(rc);
+                       }
+
+                       token.data = bsd->bsd_data;
+                       token.len = lustre_msg_buflen(msg, offset) -
+                                   sizeof(*bsd);
+
+                       maj = lgss_wrap_bulk(gctx->gc_mechctx, desc, &token, 0);
+                       if (maj != GSS_S_COMPLETE) {
+                               CWARN("fail to encrypt bulk data: %x\n", maj);
+                               RETURN(-EACCES);
+                       }
+               }
+       }
+
+       RETURN(0);
 }
 
 int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
@@ -177,6 +179,7 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
 
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
         switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
@@ -250,26 +253,27 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                 gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
                 LASSERT(gctx->gc_mechctx);
 
-                if (bsdv->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
-                        int i, nob;
-
-                        /* fix the actual data size */
-                        for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
-                                if (desc->bd_iov[i].kiov_len + nob >
-                                    desc->bd_nob_transferred) {
-                                        desc->bd_iov[i].kiov_len =
-                                                desc->bd_nob_transferred - nob;
-                                }
-                                nob += desc->bd_iov[i].kiov_len;
-                        }
-
-                        token.data = bsdv->bsd_data;
-                        token.len = lustre_msg_buflen(vmsg, voff) -
-                                    sizeof(*bsdv);
-
-                        maj = lgss_verify_mic(gctx->gc_mechctx, 0, NULL,
-                                              desc->bd_iov_count, desc->bd_iov,
-                                              &token);
+               if (bsdv->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
+                       int i, nob;
+
+                       /* fix the actual data size */
+                       for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
+                               if (BD_GET_KIOV(desc, i).kiov_len + nob >
+                                   desc->bd_nob_transferred) {
+                                       BD_GET_KIOV(desc, i).kiov_len =
+                                               desc->bd_nob_transferred - nob;
+                               }
+                               nob += BD_GET_KIOV(desc, i).kiov_len;
+                       }
+
+                       token.data = bsdv->bsd_data;
+                       token.len = lustre_msg_buflen(vmsg, voff) -
+                                   sizeof(*bsdv);
+
+                       maj = lgss_verify_mic(gctx->gc_mechctx, 0, NULL,
+                                             desc->bd_iov_count,
+                                             GET_KIOV(desc),
+                                             &token);
                         if (maj != GSS_S_COMPLETE) {
                                 CERROR("failed to verify bulk read: %x\n", maj);
                                 RETURN(-EACCES);
@@ -379,6 +383,7 @@ int gss_svc_unwrap_bulk(struct ptlrpc_request *req,
         LASSERT(req->rq_svc_ctx);
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_write);
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
         grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
 
@@ -401,8 +406,9 @@ int gss_svc_unwrap_bulk(struct ptlrpc_request *req,
                 token.data = bsdr->bsd_data;
                 token.len = grctx->src_reqbsd_size - sizeof(*bsdr);
 
-                maj = lgss_verify_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
-                                      desc->bd_iov_count, desc->bd_iov, &token);
+               maj = lgss_verify_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
+                                     desc->bd_iov_count,
+                                     GET_KIOV(desc), &token);
                 if (maj != GSS_S_COMPLETE) {
                         bsdv->bsd_flags |= BSD_FL_ERR;
                         CERROR("failed to verify bulk signature: %x\n", maj);
@@ -455,6 +461,7 @@ int gss_svc_wrap_bulk(struct ptlrpc_request *req,
         LASSERT(req->rq_svc_ctx);
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_read);
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
         grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
 
@@ -477,9 +484,10 @@ int gss_svc_wrap_bulk(struct ptlrpc_request *req,
                 token.data = bsdv->bsd_data;
                 token.len = grctx->src_repbsd_size - sizeof(*bsdv);
 
-                maj = lgss_get_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
-                                   desc->bd_iov_count, desc->bd_iov, &token);
-                if (maj != GSS_S_COMPLETE) {
+               maj = lgss_get_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
+                                  desc->bd_iov_count,
+                                  GET_KIOV(desc), &token);
+               if (maj != GSS_S_COMPLETE) {
                         bsdv->bsd_flags |= BSD_FL_ERR;
                         CERROR("failed to sign bulk data: %x\n", maj);
                         RETURN(-EACCES);
index b43cb04..0a43598 100644 (file)
@@ -949,8 +949,9 @@ int krb5_encrypt_bulk(struct crypto_blkcipher *tfm,
         struct scatterlist      src, dst;
         int                     blocksize, i, rc, nob = 0;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
         LASSERT(desc->bd_iov_count);
-        LASSERT(desc->bd_enc_iov);
+       LASSERT(GET_ENC_KIOV(desc));
 
        blocksize = crypto_blkcipher_blocksize(tfm);
         LASSERT(blocksize > 1);
@@ -973,18 +974,19 @@ int krb5_encrypt_bulk(struct crypto_blkcipher *tfm,
         /* encrypt clear pages */
         for (i = 0; i < desc->bd_iov_count; i++) {
                sg_init_table(&src, 1);
-               sg_set_page(&src, desc->bd_iov[i].kiov_page,
-                           (desc->bd_iov[i].kiov_len + blocksize - 1) &
+               sg_set_page(&src, BD_GET_KIOV(desc, i).kiov_page,
+                           (BD_GET_KIOV(desc, i).kiov_len +
+                               blocksize - 1) &
                            (~(blocksize - 1)),
-                           desc->bd_iov[i].kiov_offset);
+                           BD_GET_KIOV(desc, i).kiov_offset);
                if (adj_nob)
                        nob += src.length;
                sg_init_table(&dst, 1);
-               sg_set_page(&dst, desc->bd_enc_iov[i].kiov_page, src.length,
-                           src.offset);
+               sg_set_page(&dst, BD_GET_ENC_KIOV(desc, i).kiov_page,
+                           src.length, src.offset);
 
-                desc->bd_enc_iov[i].kiov_offset = dst.offset;
-                desc->bd_enc_iov[i].kiov_len = dst.length;
+               BD_GET_ENC_KIOV(desc, i).kiov_offset = dst.offset;
+               BD_GET_ENC_KIOV(desc, i).kiov_len = dst.length;
 
                rc = crypto_blkcipher_encrypt_iv(&ciph_desc, &dst, &src,
                                                     src.length);
@@ -1019,12 +1021,14 @@ int krb5_encrypt_bulk(struct crypto_blkcipher *tfm,
  * plain text size.
  * - for client read: we don't know data size for each page, so
  *   bd_iov[]->kiov_len is set to PAGE_SIZE, but actual data received might
- *   be smaller, so we need to adjust it according to bd_enc_iov[]->kiov_len.
+ *   be smaller, so we need to adjust it according to
+ *   bd_u.bd_kiov.bd_enc_vec[]->kiov_len.
  *   this means we DO NOT support the situation that server send an odd size
  *   data in a page which is not the last one.
  * - for server write: we knows exactly data size for each page being expected,
  *   thus kiov_len is accurate already, so we should not adjust it at all.
- *   and bd_enc_iov[]->kiov_len should be round_up(bd_iov[]->kiov_len) which
+ *   and bd_u.bd_kiov.bd_enc_vec[]->kiov_len should be
+ *   round_up(bd_iov[]->kiov_len) which
  *   should have been done by prep_bulk().
  */
 static
@@ -1041,8 +1045,9 @@ int krb5_decrypt_bulk(struct crypto_blkcipher *tfm,
         int                     ct_nob = 0, pt_nob = 0;
         int                     blocksize, i, rc;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
         LASSERT(desc->bd_iov_count);
-        LASSERT(desc->bd_enc_iov);
+       LASSERT(GET_ENC_KIOV(desc));
         LASSERT(desc->bd_nob_transferred);
 
        blocksize = crypto_blkcipher_blocksize(tfm);
@@ -1068,43 +1073,51 @@ int krb5_decrypt_bulk(struct crypto_blkcipher *tfm,
                 return rc;
         }
 
-        for (i = 0; i < desc->bd_iov_count && ct_nob < desc->bd_nob_transferred;
-             i++) {
-                if (desc->bd_enc_iov[i].kiov_offset % blocksize != 0 ||
-                    desc->bd_enc_iov[i].kiov_len % blocksize != 0) {
-                        CERROR("page %d: odd offset %u len %u, blocksize %d\n",
-                               i, desc->bd_enc_iov[i].kiov_offset,
-                               desc->bd_enc_iov[i].kiov_len, blocksize);
-                        return -EFAULT;
-                }
-
-                if (adj_nob) {
-                        if (ct_nob + desc->bd_enc_iov[i].kiov_len >
-                            desc->bd_nob_transferred)
-                                desc->bd_enc_iov[i].kiov_len =
-                                        desc->bd_nob_transferred - ct_nob;
+       for (i = 0; i < desc->bd_iov_count && ct_nob < desc->bd_nob_transferred;
+            i++) {
+               if (BD_GET_ENC_KIOV(desc, i).kiov_offset % blocksize
+                   != 0 ||
+                   BD_GET_ENC_KIOV(desc, i).kiov_len % blocksize
+                   != 0) {
+                       CERROR("page %d: odd offset %u len %u, blocksize %d\n",
+                              i, BD_GET_ENC_KIOV(desc, i).kiov_offset,
+                              BD_GET_ENC_KIOV(desc, i).kiov_len,
+                              blocksize);
+                       return -EFAULT;
+               }
 
-                        desc->bd_iov[i].kiov_len = desc->bd_enc_iov[i].kiov_len;
-                        if (pt_nob + desc->bd_enc_iov[i].kiov_len >desc->bd_nob)
-                                desc->bd_iov[i].kiov_len = desc->bd_nob -pt_nob;
-                } else {
-                        /* this should be guaranteed by LNET */
-                        LASSERT(ct_nob + desc->bd_enc_iov[i].kiov_len <=
-                                desc->bd_nob_transferred);
-                        LASSERT(desc->bd_iov[i].kiov_len <=
-                                desc->bd_enc_iov[i].kiov_len);
-                }
+               if (adj_nob) {
+                       if (ct_nob + BD_GET_ENC_KIOV(desc, i).kiov_len >
+                           desc->bd_nob_transferred)
+                               BD_GET_ENC_KIOV(desc, i).kiov_len =
+                                       desc->bd_nob_transferred - ct_nob;
+
+                       BD_GET_KIOV(desc, i).kiov_len =
+                         BD_GET_ENC_KIOV(desc, i).kiov_len;
+                       if (pt_nob + BD_GET_ENC_KIOV(desc, i).kiov_len >
+                           desc->bd_nob)
+                               BD_GET_KIOV(desc, i).kiov_len =
+                                 desc->bd_nob - pt_nob;
+               } else {
+                       /* this should be guaranteed by LNET */
+                       LASSERT(ct_nob + BD_GET_ENC_KIOV(desc, i).
+                               kiov_len <=
+                               desc->bd_nob_transferred);
+                       LASSERT(BD_GET_KIOV(desc, i).kiov_len <=
+                               BD_GET_ENC_KIOV(desc, i).kiov_len);
+               }
 
-                if (desc->bd_enc_iov[i].kiov_len == 0)
-                        continue;
+               if (BD_GET_ENC_KIOV(desc, i).kiov_len == 0)
+                       continue;
 
                sg_init_table(&src, 1);
-               sg_set_page(&src, desc->bd_enc_iov[i].kiov_page,
-                           desc->bd_enc_iov[i].kiov_len,
-                           desc->bd_enc_iov[i].kiov_offset);
+               sg_set_page(&src, BD_GET_ENC_KIOV(desc, i).kiov_page,
+                           BD_GET_ENC_KIOV(desc, i).kiov_len,
+                           BD_GET_ENC_KIOV(desc, i).kiov_offset);
                dst = src;
-               if (desc->bd_iov[i].kiov_len % blocksize == 0)
-                       sg_assign_page(&dst, desc->bd_iov[i].kiov_page);
+               if (BD_GET_KIOV(desc, i).kiov_len % blocksize == 0)
+                       sg_assign_page(&dst,
+                                      BD_GET_KIOV(desc, i).kiov_page);
 
                rc = crypto_blkcipher_decrypt_iv(&ciph_desc, &dst, &src,
                                                 src.length);
@@ -1113,17 +1126,18 @@ int krb5_decrypt_bulk(struct crypto_blkcipher *tfm,
                         return rc;
                 }
 
-                if (desc->bd_iov[i].kiov_len % blocksize != 0) {
-                       memcpy(page_address(desc->bd_iov[i].kiov_page) +
-                               desc->bd_iov[i].kiov_offset,
-                              page_address(desc->bd_enc_iov[i].kiov_page) +
-                               desc->bd_iov[i].kiov_offset,
-                               desc->bd_iov[i].kiov_len);
-                }
+               if (BD_GET_KIOV(desc, i).kiov_len % blocksize != 0) {
+                       memcpy(page_address(BD_GET_KIOV(desc, i).kiov_page) +
+                              BD_GET_KIOV(desc, i).kiov_offset,
+                              page_address(BD_GET_ENC_KIOV(desc, i).
+                                           kiov_page) +
+                              BD_GET_KIOV(desc, i).kiov_offset,
+                              BD_GET_KIOV(desc, i).kiov_len);
+               }
 
-                ct_nob += desc->bd_enc_iov[i].kiov_len;
-                pt_nob += desc->bd_iov[i].kiov_len;
-        }
+               ct_nob += BD_GET_ENC_KIOV(desc, i).kiov_len;
+               pt_nob += BD_GET_KIOV(desc, i).kiov_len;
+       }
 
         if (unlikely(ct_nob != desc->bd_nob_transferred)) {
                 CERROR("%d cipher text transferred but only %d decrypted\n",
@@ -1137,10 +1151,10 @@ int krb5_decrypt_bulk(struct crypto_blkcipher *tfm,
                 return -EFAULT;
         }
 
-        /* if needed, clear up the rest unused iovs */
-        if (adj_nob)
-                while (i < desc->bd_iov_count)
-                        desc->bd_iov[i++].kiov_len = 0;
+       /* if needed, clear up the rest unused iovs */
+       if (adj_nob)
+               while (i < desc->bd_iov_count)
+                       BD_GET_KIOV(desc, i++).kiov_len = 0;
 
         /* decrypt tail (krb5 header) */
         buf_to_sg(&src, cipher->data + blocksize, sizeof(*khdr));
@@ -1305,35 +1319,38 @@ arc4_out:
 
 static
 __u32 gss_prep_bulk_kerberos(struct gss_ctx *gctx,
-                             struct ptlrpc_bulk_desc *desc)
+                            struct ptlrpc_bulk_desc *desc)
 {
-        struct krb5_ctx     *kctx = gctx->internal_ctx_id;
-        int                  blocksize, i;
+       struct krb5_ctx     *kctx = gctx->internal_ctx_id;
+       int                  blocksize, i;
 
-        LASSERT(desc->bd_iov_count);
-        LASSERT(desc->bd_enc_iov);
-        LASSERT(kctx->kc_keye.kb_tfm);
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+       LASSERT(desc->bd_iov_count);
+       LASSERT(GET_ENC_KIOV(desc));
+       LASSERT(kctx->kc_keye.kb_tfm);
 
        blocksize = crypto_blkcipher_blocksize(kctx->kc_keye.kb_tfm);
 
-        for (i = 0; i < desc->bd_iov_count; i++) {
-                LASSERT(desc->bd_enc_iov[i].kiov_page);
-                /*
-                 * offset should always start at page boundary of either
-                 * client or server side.
-                 */
-                if (desc->bd_iov[i].kiov_offset & blocksize) {
-                        CERROR("odd offset %d in page %d\n",
-                               desc->bd_iov[i].kiov_offset, i);
-                        return GSS_S_FAILURE;
-                }
+       for (i = 0; i < desc->bd_iov_count; i++) {
+               LASSERT(BD_GET_ENC_KIOV(desc, i).kiov_page);
+               /*
+                * offset should always start at page boundary of either
+                * client or server side.
+                */
+               if (BD_GET_KIOV(desc, i).kiov_offset & blocksize) {
+                       CERROR("odd offset %d in page %d\n",
+                              BD_GET_KIOV(desc, i).kiov_offset, i);
+                       return GSS_S_FAILURE;
+               }
 
-                desc->bd_enc_iov[i].kiov_offset = desc->bd_iov[i].kiov_offset;
-                desc->bd_enc_iov[i].kiov_len = (desc->bd_iov[i].kiov_len +
-                                                blocksize - 1) & (~(blocksize - 1));
-        }
+               BD_GET_ENC_KIOV(desc, i).kiov_offset =
+                       BD_GET_KIOV(desc, i).kiov_offset;
+               BD_GET_ENC_KIOV(desc, i).kiov_len =
+                       (BD_GET_KIOV(desc, i).kiov_len +
+                        blocksize - 1) & (~(blocksize - 1));
+       }
 
-        return GSS_S_COMPLETE;
+       return GSS_S_COMPLETE;
 }
 
 static
@@ -1350,6 +1367,7 @@ __u32 gss_wrap_bulk_kerberos(struct gss_ctx *gctx,
         __u8                 conf[GSS_MAX_CIPHER_BLOCK];
         int                  rc = 0;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
         LASSERT(ke);
         LASSERT(ke->ke_conf_size <= GSS_MAX_CIPHER_BLOCK);
 
@@ -1396,13 +1414,13 @@ __u32 gss_wrap_bulk_kerberos(struct gss_ctx *gctx,
         data_desc[0].data = conf;
         data_desc[0].len = ke->ke_conf_size;
 
-        /* compute checksum */
-        if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
-                               khdr, 1, data_desc,
-                               desc->bd_iov_count, desc->bd_iov,
-                               &cksum))
-                return GSS_S_FAILURE;
-        LASSERT(cksum.len >= ke->ke_hash_size);
+       /* compute checksum */
+       if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
+                              khdr, 1, data_desc,
+                              desc->bd_iov_count, GET_KIOV(desc),
+                              &cksum))
+               return GSS_S_FAILURE;
+       LASSERT(cksum.len >= ke->ke_hash_size);
 
         /*
          * clear text layout for encryption:
@@ -1629,6 +1647,7 @@ __u32 gss_unwrap_bulk_kerberos(struct gss_ctx *gctx,
         int                  rc;
         __u32                major;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
         LASSERT(ke);
 
         if (token->len < sizeof(*khdr)) {
@@ -1686,12 +1705,13 @@ __u32 gss_unwrap_bulk_kerberos(struct gss_ctx *gctx,
         data_desc[0].data = plain.data;
         data_desc[0].len = blocksize;
 
-        if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
-                               khdr, 1, data_desc,
-                               desc->bd_iov_count, desc->bd_iov,
-                               &cksum))
-                return GSS_S_FAILURE;
-        LASSERT(cksum.len >= ke->ke_hash_size);
+       if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
+                              khdr, 1, data_desc,
+                              desc->bd_iov_count,
+                              GET_KIOV(desc),
+                              &cksum))
+               return GSS_S_FAILURE;
+       LASSERT(cksum.len >= ke->ke_hash_size);
 
         if (memcmp(plain.data + blocksize + sizeof(*khdr),
                    cksum.data + cksum.len - ke->ke_hash_size,
index 3824b11..21c1301 100644 (file)
@@ -109,23 +109,26 @@ static void mdunlink_iterate_helper(lnet_handle_md_t *bd_mds, int count)
 #ifdef HAVE_SERVER_SUPPORT
 /**
  * Prepare bulk descriptor for specified incoming request \a req that
- * can fit \a npages * pages. \a type is bulk type. \a portal is where
+ * can fit \a nfrags * pages. \a type is bulk type. \a portal is where
  * the bulk to be sent. Used on server-side after request was already
  * received.
  * Returns pointer to newly allocatrd initialized bulk descriptor or NULL on
  * error.
  */
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
-                                             unsigned npages, unsigned max_brw,
-                                             unsigned type, unsigned portal)
+                                             unsigned nfrags, unsigned max_brw,
+                                             unsigned int type,
+                                             unsigned portal,
+                                             const struct ptlrpc_bulk_frag_ops
+                                               *ops)
 {
        struct obd_export *exp = req->rq_export;
        struct ptlrpc_bulk_desc *desc;
 
        ENTRY;
-       LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
+       LASSERT(ptlrpc_is_bulk_op_active(type));
 
-       desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+       desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops);
        if (desc == NULL)
                RETURN(NULL);
 
@@ -162,8 +165,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
 
        /* NB no locking required until desc is on the network */
        LASSERT(desc->bd_md_count == 0);
-       LASSERT(desc->bd_type == BULK_PUT_SOURCE ||
-               desc->bd_type == BULK_GET_SINK);
+       LASSERT(ptlrpc_is_bulk_op_active(desc->bd_type));
 
        LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback);
        LASSERT(desc->bd_cbid.cbid_arg == desc);
@@ -212,7 +214,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                }
 
                /* Network is about to get at the memory */
-               if (desc->bd_type == BULK_PUT_SOURCE)
+               if (ptlrpc_is_bulk_put_source(desc->bd_type))
                        rc = LNetPut(conn->c_self, desc->bd_mds[posted_md],
                                     LNET_ACK_REQ, conn->c_peer,
                                     desc->bd_portal, xid, 0, 0);
@@ -318,8 +320,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
        LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
        LASSERT(desc->bd_req != NULL);
-       LASSERT(desc->bd_type == BULK_PUT_SINK ||
-               desc->bd_type == BULK_GET_SOURCE);
+       LASSERT(ptlrpc_is_bulk_op_passive(desc->bd_type));
 
        /* cleanup the state of the bulk for it will be reused */
        if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY)
@@ -358,7 +359,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
        for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
                md.options = PTLRPC_MD_OPTIONS |
-                            ((desc->bd_type == BULK_GET_SOURCE) ?
+                            (ptlrpc_is_bulk_op_get(desc->bd_type) ?
                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 
@@ -412,7 +413,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
        CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
               "xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
-              desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
+              ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink",
               desc->bd_iov_count, desc->bd_nob,
               desc->bd_last_xid, req->rq_xid, desc->bd_portal);
 
index 334deb8..4a66fb8 100644 (file)
@@ -55,24 +55,24 @@ void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
        LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV |
                                 LNET_MD_PHYS)));
 
-       md->options |= LNET_MD_KIOV;
        md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV);
        md->length = min_t(unsigned int, LNET_MAX_IOV, md->length);
-       if (desc->bd_enc_iov)
-               md->start = &desc->bd_enc_iov[mdidx * LNET_MAX_IOV];
-       else
-               md->start = &desc->bd_iov[mdidx * LNET_MAX_IOV];
-}
-
-void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page,
-                          int pageoffset, int len)
-{
-        lnet_kiov_t *kiov = &desc->bd_iov[desc->bd_iov_count];
 
-        kiov->kiov_page = page;
-        kiov->kiov_offset = pageoffset;
-        kiov->kiov_len = len;
-
-        desc->bd_iov_count++;
+       if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) {
+               md->options |= LNET_MD_KIOV;
+               if (GET_ENC_KIOV(desc))
+                       md->start = &BD_GET_ENC_KIOV(desc, mdidx *
+                                                    LNET_MAX_IOV);
+               else
+                       md->start = &BD_GET_KIOV(desc, mdidx * LNET_MAX_IOV);
+       } else if (ptlrpc_is_bulk_desc_kvec(desc->bd_type)) {
+               md->options |= LNET_MD_IOVEC;
+               if (GET_ENC_KVEC(desc))
+                       md->start = &BD_GET_ENC_KVEC(desc, mdidx *
+                                                     LNET_MAX_IOV);
+               else
+                       md->start = &BD_GET_KVEC(desc, mdidx * LNET_MAX_IOV);
+       }
 }
 
+
index c236f07..37d52a2 100644 (file)
@@ -74,7 +74,10 @@ int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc);
 void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
                               unsigned int service_time);
 struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
-                                        unsigned type, unsigned portal);
+                                        enum ptlrpc_bulk_op_type type,
+                                        unsigned portal,
+                                        const struct ptlrpc_bulk_frag_ops
+                                               *ops);
 int ptlrpc_request_cache_init(void);
 void ptlrpc_request_cache_fini(void);
 struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags);
@@ -256,8 +259,6 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink);
 /* pers.c */
 void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
                         int mdcnt);
-void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page,
-                         int pageoffset, int len);
 
 /* pack_generic.c */
 struct ptlrpc_reply_state *
index 473943c..b1828b1 100644 (file)
@@ -531,16 +531,17 @@ int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
        int             p_idx, g_idx;
        int             i;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
        LASSERT(desc->bd_iov_count > 0);
        LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
 
        /* resent bulk, enc iov might have been allocated previously */
-       if (desc->bd_enc_iov != NULL)
+       if (GET_ENC_KIOV(desc) != NULL)
                return 0;
 
-       OBD_ALLOC(desc->bd_enc_iov,
-                 desc->bd_iov_count * sizeof(*desc->bd_enc_iov));
-       if (desc->bd_enc_iov == NULL)
+       OBD_ALLOC(GET_ENC_KIOV(desc),
+                 desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc)));
+       if (GET_ENC_KIOV(desc) == NULL)
                return -ENOMEM;
 
        spin_lock(&page_pools.epp_lock);
@@ -604,17 +605,17 @@ again:
         p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
         g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
 
-        for (i = 0; i < desc->bd_iov_count; i++) {
-                LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
-                desc->bd_enc_iov[i].kiov_page =
-                                        page_pools.epp_pools[p_idx][g_idx];
-                page_pools.epp_pools[p_idx][g_idx] = NULL;
-
-                if (++g_idx == PAGES_PER_POOL) {
-                        p_idx++;
-                        g_idx = 0;
-                }
-        }
+       for (i = 0; i < desc->bd_iov_count; i++) {
+               LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
+               BD_GET_ENC_KIOV(desc, i).kiov_page =
+                      page_pools.epp_pools[p_idx][g_idx];
+               page_pools.epp_pools[p_idx][g_idx] = NULL;
+
+               if (++g_idx == PAGES_PER_POOL) {
+                       p_idx++;
+                       g_idx = 0;
+               }
+       }
 
         if (page_pools.epp_free_pages < page_pools.epp_st_lowfree)
                 page_pools.epp_st_lowfree = page_pools.epp_free_pages;
@@ -639,46 +640,48 @@ EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
 
 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
 {
-        int     p_idx, g_idx;
-        int     i;
+       int     p_idx, g_idx;
+       int     i;
 
-        if (desc->bd_enc_iov == NULL)
-                return;
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
-        LASSERT(desc->bd_iov_count > 0);
+       if (GET_ENC_KIOV(desc) == NULL)
+               return;
+
+       LASSERT(desc->bd_iov_count > 0);
 
        spin_lock(&page_pools.epp_lock);
 
-        p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
-        g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
+       p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
+       g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
 
-        LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <=
-                page_pools.epp_total_pages);
-        LASSERT(page_pools.epp_pools[p_idx]);
+       LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <=
+               page_pools.epp_total_pages);
+       LASSERT(page_pools.epp_pools[p_idx]);
 
-        for (i = 0; i < desc->bd_iov_count; i++) {
-                LASSERT(desc->bd_enc_iov[i].kiov_page != NULL);
-                LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
-                LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
+       for (i = 0; i < desc->bd_iov_count; i++) {
+               LASSERT(BD_GET_ENC_KIOV(desc, i).kiov_page != NULL);
+               LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
+               LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
 
-                page_pools.epp_pools[p_idx][g_idx] =
-                                        desc->bd_enc_iov[i].kiov_page;
+               page_pools.epp_pools[p_idx][g_idx] =
+                       BD_GET_ENC_KIOV(desc, i).kiov_page;
 
-                if (++g_idx == PAGES_PER_POOL) {
-                        p_idx++;
-                        g_idx = 0;
-                }
-        }
+               if (++g_idx == PAGES_PER_POOL) {
+                       p_idx++;
+                       g_idx = 0;
+               }
+       }
 
-        page_pools.epp_free_pages += desc->bd_iov_count;
+       page_pools.epp_free_pages += desc->bd_iov_count;
 
-        enc_pools_wakeup();
+       enc_pools_wakeup();
 
        spin_unlock(&page_pools.epp_lock);
 
-       OBD_FREE(desc->bd_enc_iov,
-                desc->bd_iov_count * sizeof(*desc->bd_enc_iov));
-       desc->bd_enc_iov = NULL;
+       OBD_FREE(GET_ENC_KIOV(desc),
+                desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc)));
+       GET_ENC_KIOV(desc) = NULL;
 }
 
 /*
@@ -885,6 +888,7 @@ int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
        unsigned int                    bufsize;
        int                             i, err;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
        LASSERT(alg > BULK_HASH_ALG_NULL && alg < BULK_HASH_ALG_MAX);
        LASSERT(buflen >= 4);
 
@@ -898,9 +902,11 @@ int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
        hashsize = cfs_crypto_hash_digestsize(cfs_hash_alg_id[alg]);
 
        for (i = 0; i < desc->bd_iov_count; i++) {
-               cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
-                                 desc->bd_iov[i].kiov_offset & ~PAGE_MASK,
-                                 desc->bd_iov[i].kiov_len);
+               cfs_crypto_hash_update_page(hdesc,
+                                 BD_GET_KIOV(desc, i).kiov_page,
+                                 BD_GET_KIOV(desc, i).kiov_offset &
+                                             ~PAGE_MASK,
+                                 BD_GET_KIOV(desc, i).kiov_len);
        }
 
        if (hashsize > buflen) {
index 9eaa852..94b924a 100644 (file)
@@ -159,14 +159,16 @@ static void corrupt_bulk_data(struct ptlrpc_bulk_desc *desc)
        char           *ptr;
        unsigned int    off, i;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
        for (i = 0; i < desc->bd_iov_count; i++) {
-               if (desc->bd_iov[i].kiov_len == 0)
+               if (BD_GET_KIOV(desc, i).kiov_len == 0)
                        continue;
 
-               ptr = kmap(desc->bd_iov[i].kiov_page);
-               off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
+               ptr = kmap(BD_GET_KIOV(desc, i).kiov_page);
+               off = BD_GET_KIOV(desc, i).kiov_offset & ~PAGE_MASK;
                ptr[off] ^= 0x1;
-               kunmap(desc->bd_iov[i].kiov_page);
+               kunmap(BD_GET_KIOV(desc, i).kiov_page);
                return;
        }
 }
@@ -341,6 +343,7 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
         int                          rc;
         int                          i, nob;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
         LASSERT(req->rq_repdata->lm_bufcount == PLAIN_PACK_SEGMENTS);
@@ -354,14 +357,15 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                 return 0;
         }
 
-        /* fix the actual data size */
-        for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
-                if (desc->bd_iov[i].kiov_len + nob > desc->bd_nob_transferred) {
-                        desc->bd_iov[i].kiov_len =
-                                desc->bd_nob_transferred - nob;
-                }
-                nob += desc->bd_iov[i].kiov_len;
-        }
+       /* fix the actual data size */
+       for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
+               if (BD_GET_KIOV(desc, i).kiov_len +
+                   nob > desc->bd_nob_transferred) {
+                       BD_GET_KIOV(desc, i).kiov_len =
+                               desc->bd_nob_transferred - nob;
+               }
+               nob += BD_GET_KIOV(desc, i).kiov_len;
+       }
 
         rc = plain_verify_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg,
                                     tokenv);
index 83f2528..8666cfc 100644 (file)
@@ -372,8 +372,10 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
        ptlrpc_at_set_req_timeout(req);
 
        /* allocate bulk descriptor */
-       desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
-                                   MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL) {
                ptlrpc_request_free(req);
                RETURN(-ENOMEM);
@@ -381,7 +383,8 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
 
        /* req now owns desc and will free it when it gets freed */
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_CACHE_SIZE);
 
        /* pack index information in request */
        req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
index c06bb18..41770e9 100644 (file)
@@ -1017,8 +1017,11 @@ int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob)
 
        ENTRY;
 
-       desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
-                                   MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1,
+                                   PTLRPC_BULK_PUT_SOURCE |
+                                       PTLRPC_BULK_BUF_KIOV,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL)
                RETURN(-ENOMEM);
 
@@ -1030,12 +1033,13 @@ int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob)
        for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
             i++, tmpcount -= tmpsize) {
                tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
-               ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
+               desc->bd_frag_ops->add_kiov_frag(desc, rdpg->rp_pages[i], 0,
+                                                tmpsize);
        }
 
        LASSERT(desc->bd_nob == nob);
        rc = target_bulk_io(exp, desc, lwi);
-       ptlrpc_free_bulk_pin(desc);
+       ptlrpc_free_bulk(desc);
        RETURN(rc);
 }
 EXPORT_SYMBOL(tgt_sendpage);
@@ -1603,6 +1607,8 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
        unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
        __u32                           cksum;
 
+       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
        hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
        if (IS_ERR(hdesc)) {
                CERROR("%s: unable to initialize checksum hash %s\n",
@@ -1616,10 +1622,11 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
                 * simulate a client->OST data error */
                if (i == 0 && opc == OST_WRITE &&
                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
-                       int off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
-                       int len = desc->bd_iov[i].kiov_len;
+                       int off = BD_GET_KIOV(desc, i).kiov_offset &
+                               ~PAGE_MASK;
+                       int len = BD_GET_KIOV(desc, i).kiov_len;
                        struct page *np = tgt_page_to_corrupt;
-                       char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+                       char *ptr = kmap(BD_GET_KIOV(desc, i).kiov_page) + off;
 
                        if (np) {
                                char *ptr2 = kmap(np) + off;
@@ -1627,24 +1634,28 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
                                memcpy(ptr2, ptr, len);
                                memcpy(ptr2, "bad3", min(4, len));
                                kunmap(np);
-                               desc->bd_iov[i].kiov_page = np;
+                               BD_GET_KIOV(desc, i).kiov_page = np;
                        } else {
                                CERROR("%s: can't alloc page for corruption\n",
                                       tgt_name(tgt));
                        }
                }
-               cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
-                                 desc->bd_iov[i].kiov_offset & ~PAGE_MASK,
-                                 desc->bd_iov[i].kiov_len);
+               cfs_crypto_hash_update_page(hdesc,
+                                 BD_GET_KIOV(desc, i).kiov_page,
+                                 BD_GET_KIOV(desc, i).kiov_offset &
+                                       ~PAGE_MASK,
+                                 BD_GET_KIOV(desc, i).kiov_len);
 
                 /* corrupt the data after we compute the checksum, to
                 * simulate an OST->client data error */
                if (i == 0 && opc == OST_READ &&
                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
-                       int off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
-                       int len = desc->bd_iov[i].kiov_len;
+                       int off = BD_GET_KIOV(desc, i).kiov_offset
+                         & ~PAGE_MASK;
+                       int len = BD_GET_KIOV(desc, i).kiov_len;
                        struct page *np = tgt_page_to_corrupt;
-                       char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+                       char *ptr =
+                         kmap(BD_GET_KIOV(desc, i).kiov_page) + off;
 
                        if (np) {
                                char *ptr2 = kmap(np) + off;
@@ -1652,7 +1663,7 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
                                memcpy(ptr2, ptr, len);
                                memcpy(ptr2, "bad4", min(4, len));
                                kunmap(np);
-                               desc->bd_iov[i].kiov_page = np;
+                               BD_GET_KIOV(desc, i).kiov_page = np;
                        } else {
                                CERROR("%s: can't alloc page for corruption\n",
                                       tgt_name(tgt));
@@ -1755,7 +1766,10 @@ int tgt_brw_read(struct tgt_session_info *tsi)
                GOTO(out_lock, rc);
 
        desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
-                                   BULK_PUT_SOURCE, OST_BULK_PORTAL);
+                                   PTLRPC_BULK_PUT_SOURCE |
+                                       PTLRPC_BULK_BUF_KIOV,
+                                   OST_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_nopin_ops);
        if (desc == NULL)
                GOTO(out_commitrw, rc = -ENOMEM);
 
@@ -1771,9 +1785,10 @@ int tgt_brw_read(struct tgt_session_info *tsi)
                nob += page_rc;
                if (page_rc != 0) { /* some data! */
                        LASSERT(local_nb[i].lnb_page != NULL);
-                       ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page,
-                                                   local_nb[i].lnb_page_offset,
-                                                   page_rc);
+                       desc->bd_frag_ops->add_kiov_frag
+                         (desc, local_nb[i].lnb_page,
+                          local_nb[i].lnb_page_offset,
+                          page_rc);
                }
 
                if (page_rc != local_nb[i].lnb_len) { /* short read */
@@ -1817,7 +1832,7 @@ out_lock:
        tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PR);
 
        if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
-               ptlrpc_free_bulk_nopin(desc);
+               ptlrpc_free_bulk(desc);
 
        LASSERT(rc <= 0);
        if (rc == 0) {
@@ -1845,7 +1860,7 @@ out_lock:
                lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
                l_wait_event(waitq, 0, &lwi1);
                target_bulk_io(exp, desc, &lwi);
-               ptlrpc_free_bulk_nopin(desc);
+               ptlrpc_free_bulk(desc);
        }
 
        RETURN(rc);
@@ -2020,15 +2035,18 @@ int tgt_brw_write(struct tgt_session_info *tsi)
                GOTO(out_lock, rc);
 
        desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
-                                   BULK_GET_SINK, OST_BULK_PORTAL);
+                                   PTLRPC_BULK_GET_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   OST_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_nopin_ops);
        if (desc == NULL)
                GOTO(skip_transfer, rc = -ENOMEM);
 
        /* NB Having prepped, we must commit... */
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page,
-                                           local_nb[i].lnb_page_offset,
-                                           local_nb[i].lnb_len);
+               desc->bd_frag_ops->add_kiov_frag(desc,
+                                                local_nb[i].lnb_page,
+                                                local_nb[i].lnb_page_offset,
+                                                local_nb[i].lnb_len);
 
        rc = sptlrpc_svc_prep_bulk(req, desc);
        if (rc != 0)
@@ -2109,7 +2127,7 @@ skip_transfer:
 out_lock:
        tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PW);
        if (desc)
-               ptlrpc_free_bulk_nopin(desc);
+               ptlrpc_free_bulk(desc);
 out:
        if (no_reply) {
                req->rq_no_reply = 1;