Added a union to ptlrpc_bulk_desc for KVEC and KIOV buffers.
bd_type changed to be a bit mask.
Bits are set in bd_type to specify {put,get}{source,sink}{kvec,kiov}
changed all instances in the code to access the union properly
ASSUMPTION: all current code only works with KIOV and new DNE code
to be added will introduce a different code path that uses IOVEC
As part of the implementation buffer operations are added
as callbacks to be defined by users of ptlrpc. This enables
buffer users to define how to allocate and release buffers.
Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Change-Id: I86d718a668e7c0b422b96b393a6a69e5b8e89291
Reviewed-on: http://review.whamcloud.com/12525
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Liang Zhen <liang.zhen@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
* @{
*/
+#include <linux/uio.h>
#include <libcfs/libcfs.h>
#include <lnet/nidstr.h>
#include <lnet/api.h>
struct page *bp_page;
};
-#define BULK_GET_SOURCE 0
-#define BULK_PUT_SINK 1
-#define BULK_GET_SINK 2
-#define BULK_PUT_SOURCE 3
+enum ptlrpc_bulk_op_type {
+ PTLRPC_BULK_OP_ACTIVE = 0x00000001,
+ PTLRPC_BULK_OP_PASSIVE = 0x00000002,
+ PTLRPC_BULK_OP_PUT = 0x00000004,
+ PTLRPC_BULK_OP_GET = 0x00000008,
+ PTLRPC_BULK_BUF_KVEC = 0x00000010,
+ PTLRPC_BULK_BUF_KIOV = 0x00000020,
+ PTLRPC_BULK_GET_SOURCE = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_GET,
+ PTLRPC_BULK_PUT_SINK = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_PUT,
+ PTLRPC_BULK_GET_SINK = PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_GET,
+ PTLRPC_BULK_PUT_SOURCE = PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_PUT,
+};
-/**
+static inline bool ptlrpc_is_bulk_op_get(enum ptlrpc_bulk_op_type type)
+{
+ return (type & PTLRPC_BULK_OP_GET) == PTLRPC_BULK_OP_GET;
+}
+
+static inline bool ptlrpc_is_bulk_get_source(enum ptlrpc_bulk_op_type type)
+{
+ return (type & PTLRPC_BULK_GET_SOURCE) == PTLRPC_BULK_GET_SOURCE;
+}
+
+static inline bool ptlrpc_is_bulk_put_sink(enum ptlrpc_bulk_op_type type)
+{
+ return (type & PTLRPC_BULK_PUT_SINK) == PTLRPC_BULK_PUT_SINK;
+}
+
+static inline bool ptlrpc_is_bulk_get_sink(enum ptlrpc_bulk_op_type type)
+{
+ return (type & PTLRPC_BULK_GET_SINK) == PTLRPC_BULK_GET_SINK;
+}
+
+static inline bool ptlrpc_is_bulk_put_source(enum ptlrpc_bulk_op_type type)
+{
+ return (type & PTLRPC_BULK_PUT_SOURCE) == PTLRPC_BULK_PUT_SOURCE;
+}
+
+static inline bool ptlrpc_is_bulk_desc_kvec(enum ptlrpc_bulk_op_type type)
+{
+ return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
+ == PTLRPC_BULK_BUF_KVEC;
+}
+
+static inline bool ptlrpc_is_bulk_desc_kiov(enum ptlrpc_bulk_op_type type)
+{
+ return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
+ == PTLRPC_BULK_BUF_KIOV;
+}
+
+static inline bool ptlrpc_is_bulk_op_active(enum ptlrpc_bulk_op_type type)
+{
+ return ((type & PTLRPC_BULK_OP_ACTIVE) |
+ (type & PTLRPC_BULK_OP_PASSIVE))
+ == PTLRPC_BULK_OP_ACTIVE;
+}
+
+static inline bool ptlrpc_is_bulk_op_passive(enum ptlrpc_bulk_op_type type)
+{
+ return ((type & PTLRPC_BULK_OP_ACTIVE) |
+ (type & PTLRPC_BULK_OP_PASSIVE))
+ == PTLRPC_BULK_OP_PASSIVE;
+}
+
+struct ptlrpc_bulk_frag_ops {
+ /**
+ * Add a page \a page to the bulk descriptor \a desc
+ * Data to transfer in the page starts at offset \a pageoffset and
+ * amount of data to transfer from the page is \a len
+ */
+ void (*add_kiov_frag)(struct ptlrpc_bulk_desc *desc,
+ struct page *page, int pageoffset, int len);
+
+ /*
+ * Add a \a fragment to the bulk descriptor \a desc.
+ * Data to transfer in the fragment is pointed to by \a frag
+ * The size of the fragment is \a len
+ */
+ int (*add_iov_frag)(struct ptlrpc_bulk_desc *desc, void *frag, int len);
+
+ /**
+ * Uninitialize and free bulk descriptor \a desc.
+ * Works on bulk descriptors both from server and client side.
+ */
+ void (*release_frags)(struct ptlrpc_bulk_desc *desc);
+};
+
+extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops;
+extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
+
+/*
* Definition of bulk descriptor.
* Bulks are special "Two phase" RPCs where initial request message
* is sent first and it is followed bt a transfer (o receiving) of a large
struct ptlrpc_bulk_desc {
/** completed with failure */
unsigned long bd_failure:1;
- /** {put,get}{source,sink} */
- unsigned long bd_type:2;
/** client side */
unsigned long bd_registered:1;
/** For serialization with callback */
spinlock_t bd_lock;
/** Import generation when request for this bulk was sent */
int bd_import_generation;
+ /** {put,get}{source,sink}{kvec,kiov} */
+ enum ptlrpc_bulk_op_type bd_type;
/** LNet portal for this bulk */
__u32 bd_portal;
/** Server side - export this bulk created for */
struct obd_import *bd_import;
/** Back pointer to the request */
struct ptlrpc_request *bd_req;
+ struct ptlrpc_bulk_frag_ops *bd_frag_ops;
wait_queue_head_t bd_waitq; /* server side only WQ */
int bd_iov_count; /* # entries in bd_iov */
int bd_max_iov; /* allocated size of bd_iov */
/** array of associated MDs */
lnet_handle_md_t bd_mds[PTLRPC_BULK_OPS_COUNT];
- /*
- * encrypt iov, size is either 0 or bd_iov_count.
- */
- lnet_kiov_t *bd_enc_iov;
+ union {
+ struct {
+ /*
+ * encrypt iov, size is either 0 or bd_iov_count.
+ */
+ lnet_kiov_t *bd_enc_vec;
+ lnet_kiov_t bd_vec[0];
+ } bd_kiov;
+
+ struct {
+ struct kvec *bd_enc_kvec;
+ struct kvec bd_kvec[0];
+ } bd_kvec;
+ } bd_u;
- lnet_kiov_t bd_iov[0];
};
+#define GET_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_vec)
+#define BD_GET_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_vec[i])
+#define GET_ENC_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_enc_vec)
+#define BD_GET_ENC_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_enc_vec[i])
+#define GET_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_kvec)
+#define BD_GET_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_kvec[i])
+#define GET_ENC_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_enc_kvec)
+#define BD_GET_ENC_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_enc_kvec[i])
+
enum {
SVC_STOPPED = 1 << 0,
SVC_STOPPING = 1 << 1,
*/
#ifdef HAVE_SERVER_SUPPORT
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
- unsigned npages, unsigned max_brw,
- unsigned type, unsigned portal);
+ unsigned nfrags, unsigned max_brw,
+ unsigned int type,
+ unsigned portal,
+ const struct ptlrpc_bulk_frag_ops
+ *ops);
int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
- unsigned npages, unsigned max_brw,
- unsigned type, unsigned portal);
-void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin);
-static inline void ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc *bulk)
-{
- __ptlrpc_free_bulk(bulk, 1);
-}
-static inline void ptlrpc_free_bulk_nopin(struct ptlrpc_bulk_desc *bulk)
-{
- __ptlrpc_free_bulk(bulk, 0);
-}
+ unsigned nfrags, unsigned max_brw,
+ unsigned int type,
+ unsigned portal,
+ const struct ptlrpc_bulk_frag_ops
+ *ops);
+
+int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
+ void *frag, int len);
void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
- struct page *page, int pageoffset, int len, int);
+ struct page *page, int pageoffset, int len,
+ int pin);
static inline void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
struct page *page, int pageoffset,
int len)
__ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
}
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
+
+static inline void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
+{
+ int i;
+
+ for (i = 0; i < desc->bd_iov_count ; i++)
+ page_cache_release(BD_GET_KIOV(desc, i).kiov_page);
+}
+
+static inline void ptlrpc_release_bulk_noop(struct ptlrpc_bulk_desc *desc)
+{
+}
+
void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
struct obd_import *imp);
__u64 ptlrpc_next_xid(void);
req->rq_request_portal = MDS_READPAGE_PORTAL;
ptlrpc_at_set_req_timeout(req);
- desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
- MDS_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+ PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+ MDS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
if (desc == NULL) {
ptlrpc_request_free(req);
RETURN(-ENOMEM);
/* NB req now owns desc and will free it when it gets freed */
for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+ desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+ PAGE_CACHE_SIZE);
mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE * npages, fid, oc);
body->mcb_units = nrpages;
/* allocate bulk transfer descriptor */
- desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK,
- MGS_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp(req, nrpages, 1,
+ PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+ MGS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
for (i = 0; i < nrpages; i++)
- ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+ desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+ PAGE_CACHE_SIZE);
ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
page_count = (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
LASSERT(page_count <= nrpages);
desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
- BULK_PUT_SOURCE, MGS_BULK_PORTAL);
+ PTLRPC_BULK_PUT_SOURCE |
+ PTLRPC_BULK_BUF_KIOV,
+ MGS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
for (i = 0; i < page_count && bytes > 0; i++) {
- ptlrpc_prep_bulk_page_pin(desc, pages[i], 0,
- min_t(int, bytes, PAGE_CACHE_SIZE));
+ desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+ min_t(int, bytes,
+ PAGE_CACHE_SIZE));
bytes -= PAGE_CACHE_SIZE;
}
rc = target_bulk_io(req->rq_export, desc, &lwi);
- ptlrpc_free_bulk_pin(desc);
+ ptlrpc_free_bulk(desc);
out:
for (i = 0; i < nrpages; i++) {
int count = 0;
int i;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
for (i = 0; i < page_count; i++) {
- void *pz = page_zone(desc->bd_iov[i].kiov_page);
+ void *pz = page_zone(BD_GET_KIOV(desc, i).kiov_page);
if (likely(pz == zone)) {
++count;
desc = ptlrpc_prep_bulk_imp(req, page_count,
cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
- opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
- OST_BULK_PORTAL);
+ (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
+ PTLRPC_BULK_PUT_SINK) |
+ PTLRPC_BULK_BUF_KIOV,
+ OST_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
- ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
+ desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
ptlrpc_at_set_req_timeout(req);
- desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
- MDS_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+ PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+ MDS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
if (desc == NULL) {
ptlrpc_request_free(req);
RETURN(-ENOMEM);
}
for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+ desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+ PAGE_CACHE_SIZE);
ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
#include "ptlrpc_internal.h"
+const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
+ .add_kiov_frag = ptlrpc_prep_bulk_page_pin,
+ .release_frags = ptlrpc_release_bulk_page_pin,
+};
+EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
+
+const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
+ .add_kiov_frag = ptlrpc_prep_bulk_page_nopin,
+ .release_frags = ptlrpc_release_bulk_noop,
+};
+EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
+
static int ptlrpc_send_new_req(struct ptlrpc_request *req);
static int ptlrpcd_check_work(struct ptlrpc_request *req);
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
* Allocate and initialize new bulk descriptor on the sender.
* Returns pointer to the descriptor or NULL on error.
*/
-struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
- unsigned type, unsigned portal)
+struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
+ enum ptlrpc_bulk_op_type type,
+ unsigned portal,
+ const struct ptlrpc_bulk_frag_ops *ops)
{
struct ptlrpc_bulk_desc *desc;
int i;
- OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]));
+ /* ensure that only one of KIOV or IOVEC is set but not both */
+ LASSERT((ptlrpc_is_bulk_desc_kiov(type) &&
+ ops->add_kiov_frag != NULL) ||
+ (ptlrpc_is_bulk_desc_kvec(type) &&
+ ops->add_iov_frag != NULL));
+
+ if (type & PTLRPC_BULK_BUF_KIOV) {
+ OBD_ALLOC(desc,
+ offsetof(struct ptlrpc_bulk_desc,
+ bd_u.bd_kiov.bd_vec[nfrags]));
+ } else {
+ OBD_ALLOC(desc,
+ offsetof(struct ptlrpc_bulk_desc,
+ bd_u.bd_kvec.bd_kvec[nfrags]));
+ }
+
if (!desc)
return NULL;
spin_lock_init(&desc->bd_lock);
init_waitqueue_head(&desc->bd_waitq);
- desc->bd_max_iov = npages;
+ desc->bd_max_iov = nfrags;
desc->bd_iov_count = 0;
desc->bd_portal = portal;
desc->bd_type = type;
desc->bd_md_count = 0;
+ desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *) ops;
LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
/* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
/**
* Prepare bulk descriptor for specified outgoing request \a req that
- * can fit \a npages * pages. \a type is bulk type. \a portal is where
+ * can fit \a nfrags * pages. \a type is bulk type. \a portal is where
* the bulk to be sent. Used on client-side.
* Returns pointer to newly allocatrd initialized bulk descriptor or NULL on
* error.
*/
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
- unsigned npages, unsigned max_brw,
- unsigned type, unsigned portal)
+ unsigned nfrags, unsigned max_brw,
+ unsigned int type,
+ unsigned portal,
+ const struct ptlrpc_bulk_frag_ops
+ *ops)
{
struct obd_import *imp = req->rq_import;
struct ptlrpc_bulk_desc *desc;
ENTRY;
- LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
- desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+ LASSERT(ptlrpc_is_bulk_op_passive(type));
+
+ desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops);
if (desc == NULL)
RETURN(NULL);
}
EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
-/*
- * Add a page \a page to the bulk descriptor \a desc.
- * Data to transfer in the page starts at offset \a pageoffset and
- * amount of data to transfer from the page is \a len
- */
void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
- struct page *page, int pageoffset, int len, int pin)
+ struct page *page, int pageoffset, int len,
+ int pin)
{
+ lnet_kiov_t *kiov;
+
LASSERT(desc->bd_iov_count < desc->bd_max_iov);
LASSERT(page != NULL);
LASSERT(pageoffset >= 0);
LASSERT(len > 0);
LASSERT(pageoffset + len <= PAGE_CACHE_SIZE);
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
+ kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
desc->bd_nob += len;
if (pin)
page_cache_get(page);
- ptlrpc_add_bulk_page(desc, page, pageoffset, len);
+ kiov->kiov_page = page;
+ kiov->kiov_offset = pageoffset;
+ kiov->kiov_len = len;
+
+ desc->bd_iov_count++;
}
EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
-/**
- * Uninitialize and free bulk descriptor \a desc.
- * Works on bulk descriptors both from server and client side.
- */
-void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
+int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
+ void *frag, int len)
+{
+ struct kvec *iovec;
+
+ LASSERT(desc->bd_iov_count < desc->bd_max_iov);
+ LASSERT(frag != NULL);
+ LASSERT(len > 0);
+ LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
+
+ iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
+
+ desc->bd_nob += len;
+
+ iovec->iov_base = frag;
+ iovec->iov_len = len;
+
+ desc->bd_iov_count++;
+
+ return desc->bd_nob;
+}
+EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
+
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
{
- int i;
ENTRY;
LASSERT(desc != NULL);
LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
LASSERT(desc->bd_md_count == 0); /* network hands off */
LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
+ LASSERT(desc->bd_frag_ops != NULL);
- sptlrpc_enc_pool_put_pages(desc);
+ if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
+ sptlrpc_enc_pool_put_pages(desc);
if (desc->bd_export)
class_export_put(desc->bd_export);
else
class_import_put(desc->bd_import);
- if (unpin) {
- for (i = 0; i < desc->bd_iov_count ; i++)
- page_cache_release(desc->bd_iov[i].kiov_page);
- }
+ if (desc->bd_frag_ops->release_frags != NULL)
+ desc->bd_frag_ops->release_frags(desc);
+
+ if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
+ OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+ bd_u.bd_kiov.bd_vec[desc->bd_max_iov]));
+ else
+ OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+ bd_u.bd_kvec.bd_kvec[desc->
+ bd_max_iov]));
- OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
- bd_iov[desc->bd_max_iov]));
EXIT;
}
-EXPORT_SYMBOL(__ptlrpc_free_bulk);
+EXPORT_SYMBOL(ptlrpc_free_bulk);
/**
* Set server timelimit for this req, i.e. how long are we willing to wait
request->rq_import = NULL;
}
if (request->rq_bulk != NULL)
- ptlrpc_free_bulk_pin(request->rq_bulk);
+ ptlrpc_free_bulk(request->rq_bulk);
if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL)
sptlrpc_cli_free_reqbuf(request);
struct ptlrpc_request *req;
ENTRY;
- LASSERT ((desc->bd_type == BULK_PUT_SINK &&
- ev->type == LNET_EVENT_PUT) ||
- (desc->bd_type == BULK_GET_SOURCE &&
- ev->type == LNET_EVENT_GET) ||
- ev->type == LNET_EVENT_UNLINK);
- LASSERT (ev->unlinked);
+ LASSERT((ptlrpc_is_bulk_put_sink(desc->bd_type) &&
+ ev->type == LNET_EVENT_PUT) ||
+ (ptlrpc_is_bulk_get_source(desc->bd_type) &&
+ ev->type == LNET_EVENT_GET) ||
+ ev->type == LNET_EVENT_UNLINK);
+ LASSERT(ev->unlinked);
if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
ev->status = -EIO;
LASSERT(ev->type == LNET_EVENT_SEND ||
ev->type == LNET_EVENT_UNLINK ||
- (desc->bd_type == BULK_PUT_SOURCE &&
+ (ptlrpc_is_bulk_put_source(desc->bd_type) &&
ev->type == LNET_EVENT_ACK) ||
- (desc->bd_type == BULK_GET_SINK &&
+ (ptlrpc_is_bulk_get_sink(desc->bd_type) &&
ev->type == LNET_EVENT_REPLY));
CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
struct ptlrpc_request *req,
struct ptlrpc_bulk_desc *desc)
{
- struct gss_cli_ctx *gctx;
- struct lustre_msg *msg;
- struct ptlrpc_bulk_sec_desc *bsd;
- rawobj_t token;
- __u32 maj;
- int offset;
- int rc;
- ENTRY;
-
- LASSERT(req->rq_pack_bulk);
- LASSERT(req->rq_bulk_read || req->rq_bulk_write);
-
- gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
- LASSERT(gctx->gc_mechctx);
-
- switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
- case SPTLRPC_SVC_NULL:
- LASSERT(req->rq_reqbuf->lm_bufcount >= 3);
- msg = req->rq_reqbuf;
- offset = msg->lm_bufcount - 1;
- break;
- case SPTLRPC_SVC_AUTH:
- case SPTLRPC_SVC_INTG:
- LASSERT(req->rq_reqbuf->lm_bufcount >= 4);
- msg = req->rq_reqbuf;
- offset = msg->lm_bufcount - 2;
- break;
- case SPTLRPC_SVC_PRIV:
- LASSERT(req->rq_clrbuf->lm_bufcount >= 2);
- msg = req->rq_clrbuf;
- offset = msg->lm_bufcount - 1;
- break;
- default:
- LBUG();
- }
-
- bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
- bsd->bsd_version = 0;
- bsd->bsd_flags = 0;
- bsd->bsd_type = SPTLRPC_BULK_DEFAULT;
- bsd->bsd_svc = SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc);
-
- if (bsd->bsd_svc == SPTLRPC_BULK_SVC_NULL)
- RETURN(0);
-
- LASSERT(bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG ||
- bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV);
-
- if (req->rq_bulk_read) {
- /*
- * bulk read: prepare receiving pages only for privacy mode.
- */
- if (bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV)
- return gss_cli_prep_bulk(req, desc);
- } else {
- /*
- * bulk write: sign or encrypt bulk pages.
- */
- bsd->bsd_nob = desc->bd_nob;
-
- if (bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
- /* integrity mode */
- token.data = bsd->bsd_data;
- token.len = lustre_msg_buflen(msg, offset) -
- sizeof(*bsd);
-
- maj = lgss_get_mic(gctx->gc_mechctx, 0, NULL,
- desc->bd_iov_count, desc->bd_iov,
- &token);
- if (maj != GSS_S_COMPLETE) {
- CWARN("failed to sign bulk data: %x\n", maj);
- RETURN(-EACCES);
- }
- } else {
- /* privacy mode */
- if (desc->bd_iov_count == 0)
- RETURN(0);
-
- rc = sptlrpc_enc_pool_get_pages(desc);
- if (rc) {
- CERROR("bulk write: failed to allocate "
- "encryption pages: %d\n", rc);
- RETURN(rc);
- }
-
- token.data = bsd->bsd_data;
- token.len = lustre_msg_buflen(msg, offset) -
- sizeof(*bsd);
-
- maj = lgss_wrap_bulk(gctx->gc_mechctx, desc, &token, 0);
- if (maj != GSS_S_COMPLETE) {
- CWARN("fail to encrypt bulk data: %x\n", maj);
- RETURN(-EACCES);
- }
- }
- }
-
- RETURN(0);
+ struct gss_cli_ctx *gctx;
+ struct lustre_msg *msg;
+ struct ptlrpc_bulk_sec_desc *bsd;
+ rawobj_t token;
+ __u32 maj;
+ int offset;
+ int rc;
+ ENTRY;
+
+ LASSERT(req->rq_pack_bulk);
+ LASSERT(req->rq_bulk_read || req->rq_bulk_write);
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
+ gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
+ LASSERT(gctx->gc_mechctx);
+
+ switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
+ case SPTLRPC_SVC_NULL:
+ LASSERT(req->rq_reqbuf->lm_bufcount >= 3);
+ msg = req->rq_reqbuf;
+ offset = msg->lm_bufcount - 1;
+ break;
+ case SPTLRPC_SVC_AUTH:
+ case SPTLRPC_SVC_INTG:
+ LASSERT(req->rq_reqbuf->lm_bufcount >= 4);
+ msg = req->rq_reqbuf;
+ offset = msg->lm_bufcount - 2;
+ break;
+ case SPTLRPC_SVC_PRIV:
+ LASSERT(req->rq_clrbuf->lm_bufcount >= 2);
+ msg = req->rq_clrbuf;
+ offset = msg->lm_bufcount - 1;
+ break;
+ default:
+ LBUG();
+ }
+
+ bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
+ bsd->bsd_version = 0;
+ bsd->bsd_flags = 0;
+ bsd->bsd_type = SPTLRPC_BULK_DEFAULT;
+ bsd->bsd_svc = SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc);
+
+ if (bsd->bsd_svc == SPTLRPC_BULK_SVC_NULL)
+ RETURN(0);
+
+ LASSERT(bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG ||
+ bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV);
+
+ if (req->rq_bulk_read) {
+ /*
+ * bulk read: prepare receiving pages only for privacy mode.
+ */
+ if (bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV)
+ return gss_cli_prep_bulk(req, desc);
+ } else {
+ /*
+ * bulk write: sign or encrypt bulk pages.
+ */
+ bsd->bsd_nob = desc->bd_nob;
+
+ if (bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
+ /* integrity mode */
+ token.data = bsd->bsd_data;
+ token.len = lustre_msg_buflen(msg, offset) -
+ sizeof(*bsd);
+
+ maj = lgss_get_mic(gctx->gc_mechctx, 0, NULL,
+ desc->bd_iov_count,
+ GET_KIOV(desc),
+ &token);
+ if (maj != GSS_S_COMPLETE) {
+ CWARN("failed to sign bulk data: %x\n", maj);
+ RETURN(-EACCES);
+ }
+ } else {
+ /* privacy mode */
+ if (desc->bd_iov_count == 0)
+ RETURN(0);
+
+ rc = sptlrpc_enc_pool_get_pages(desc);
+ if (rc) {
+ CERROR("bulk write: failed to allocate "
+ "encryption pages: %d\n", rc);
+ RETURN(rc);
+ }
+
+ token.data = bsd->bsd_data;
+ token.len = lustre_msg_buflen(msg, offset) -
+ sizeof(*bsd);
+
+ maj = lgss_wrap_bulk(gctx->gc_mechctx, desc, &token, 0);
+ if (maj != GSS_S_COMPLETE) {
+ CWARN("fail to encrypt bulk data: %x\n", maj);
+ RETURN(-EACCES);
+ }
+ }
+ }
+
+ RETURN(0);
}
int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
LASSERT(req->rq_pack_bulk);
LASSERT(req->rq_bulk_read || req->rq_bulk_write);
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
case SPTLRPC_SVC_NULL:
gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
LASSERT(gctx->gc_mechctx);
- if (bsdv->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
- int i, nob;
-
- /* fix the actual data size */
- for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
- if (desc->bd_iov[i].kiov_len + nob >
- desc->bd_nob_transferred) {
- desc->bd_iov[i].kiov_len =
- desc->bd_nob_transferred - nob;
- }
- nob += desc->bd_iov[i].kiov_len;
- }
-
- token.data = bsdv->bsd_data;
- token.len = lustre_msg_buflen(vmsg, voff) -
- sizeof(*bsdv);
-
- maj = lgss_verify_mic(gctx->gc_mechctx, 0, NULL,
- desc->bd_iov_count, desc->bd_iov,
- &token);
+ if (bsdv->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
+ int i, nob;
+
+ /* fix the actual data size */
+ for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
+ if (BD_GET_KIOV(desc, i).kiov_len + nob >
+ desc->bd_nob_transferred) {
+ BD_GET_KIOV(desc, i).kiov_len =
+ desc->bd_nob_transferred - nob;
+ }
+ nob += BD_GET_KIOV(desc, i).kiov_len;
+ }
+
+ token.data = bsdv->bsd_data;
+ token.len = lustre_msg_buflen(vmsg, voff) -
+ sizeof(*bsdv);
+
+ maj = lgss_verify_mic(gctx->gc_mechctx, 0, NULL,
+ desc->bd_iov_count,
+ GET_KIOV(desc),
+ &token);
if (maj != GSS_S_COMPLETE) {
CERROR("failed to verify bulk read: %x\n", maj);
RETURN(-EACCES);
LASSERT(req->rq_svc_ctx);
LASSERT(req->rq_pack_bulk);
LASSERT(req->rq_bulk_write);
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
token.data = bsdr->bsd_data;
token.len = grctx->src_reqbsd_size - sizeof(*bsdr);
- maj = lgss_verify_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
- desc->bd_iov_count, desc->bd_iov, &token);
+ maj = lgss_verify_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
+ desc->bd_iov_count,
+ GET_KIOV(desc), &token);
if (maj != GSS_S_COMPLETE) {
bsdv->bsd_flags |= BSD_FL_ERR;
CERROR("failed to verify bulk signature: %x\n", maj);
LASSERT(req->rq_svc_ctx);
LASSERT(req->rq_pack_bulk);
LASSERT(req->rq_bulk_read);
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
token.data = bsdv->bsd_data;
token.len = grctx->src_repbsd_size - sizeof(*bsdv);
- maj = lgss_get_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
- desc->bd_iov_count, desc->bd_iov, &token);
- if (maj != GSS_S_COMPLETE) {
+ maj = lgss_get_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
+ desc->bd_iov_count,
+ GET_KIOV(desc), &token);
+ if (maj != GSS_S_COMPLETE) {
bsdv->bsd_flags |= BSD_FL_ERR;
CERROR("failed to sign bulk data: %x\n", maj);
RETURN(-EACCES);
struct scatterlist src, dst;
int blocksize, i, rc, nob = 0;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
LASSERT(desc->bd_iov_count);
- LASSERT(desc->bd_enc_iov);
+ LASSERT(GET_ENC_KIOV(desc));
blocksize = crypto_blkcipher_blocksize(tfm);
LASSERT(blocksize > 1);
/* encrypt clear pages */
for (i = 0; i < desc->bd_iov_count; i++) {
sg_init_table(&src, 1);
- sg_set_page(&src, desc->bd_iov[i].kiov_page,
- (desc->bd_iov[i].kiov_len + blocksize - 1) &
+ sg_set_page(&src, BD_GET_KIOV(desc, i).kiov_page,
+ (BD_GET_KIOV(desc, i).kiov_len +
+ blocksize - 1) &
(~(blocksize - 1)),
- desc->bd_iov[i].kiov_offset);
+ BD_GET_KIOV(desc, i).kiov_offset);
if (adj_nob)
nob += src.length;
sg_init_table(&dst, 1);
- sg_set_page(&dst, desc->bd_enc_iov[i].kiov_page, src.length,
- src.offset);
+ sg_set_page(&dst, BD_GET_ENC_KIOV(desc, i).kiov_page,
+ src.length, src.offset);
- desc->bd_enc_iov[i].kiov_offset = dst.offset;
- desc->bd_enc_iov[i].kiov_len = dst.length;
+ BD_GET_ENC_KIOV(desc, i).kiov_offset = dst.offset;
+ BD_GET_ENC_KIOV(desc, i).kiov_len = dst.length;
rc = crypto_blkcipher_encrypt_iv(&ciph_desc, &dst, &src,
src.length);
* plain text size.
* - for client read: we don't know data size for each page, so
* bd_iov[]->kiov_len is set to PAGE_SIZE, but actual data received might
- * be smaller, so we need to adjust it according to bd_enc_iov[]->kiov_len.
+ * be smaller, so we need to adjust it according to
+ * bd_u.bd_kiov.bd_enc_vec[]->kiov_len.
* this means we DO NOT support the situation that server send an odd size
* data in a page which is not the last one.
* - for server write: we knows exactly data size for each page being expected,
* thus kiov_len is accurate already, so we should not adjust it at all.
- * and bd_enc_iov[]->kiov_len should be round_up(bd_iov[]->kiov_len) which
+ * and bd_u.bd_kiov.bd_enc_vec[]->kiov_len should be
+ * round_up(bd_iov[]->kiov_len) which
* should have been done by prep_bulk().
*/
static
int ct_nob = 0, pt_nob = 0;
int blocksize, i, rc;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
LASSERT(desc->bd_iov_count);
- LASSERT(desc->bd_enc_iov);
+ LASSERT(GET_ENC_KIOV(desc));
LASSERT(desc->bd_nob_transferred);
blocksize = crypto_blkcipher_blocksize(tfm);
return rc;
}
- for (i = 0; i < desc->bd_iov_count && ct_nob < desc->bd_nob_transferred;
- i++) {
- if (desc->bd_enc_iov[i].kiov_offset % blocksize != 0 ||
- desc->bd_enc_iov[i].kiov_len % blocksize != 0) {
- CERROR("page %d: odd offset %u len %u, blocksize %d\n",
- i, desc->bd_enc_iov[i].kiov_offset,
- desc->bd_enc_iov[i].kiov_len, blocksize);
- return -EFAULT;
- }
-
- if (adj_nob) {
- if (ct_nob + desc->bd_enc_iov[i].kiov_len >
- desc->bd_nob_transferred)
- desc->bd_enc_iov[i].kiov_len =
- desc->bd_nob_transferred - ct_nob;
+ for (i = 0; i < desc->bd_iov_count && ct_nob < desc->bd_nob_transferred;
+ i++) {
+ if (BD_GET_ENC_KIOV(desc, i).kiov_offset % blocksize
+ != 0 ||
+ BD_GET_ENC_KIOV(desc, i).kiov_len % blocksize
+ != 0) {
+ CERROR("page %d: odd offset %u len %u, blocksize %d\n",
+ i, BD_GET_ENC_KIOV(desc, i).kiov_offset,
+ BD_GET_ENC_KIOV(desc, i).kiov_len,
+ blocksize);
+ return -EFAULT;
+ }
- desc->bd_iov[i].kiov_len = desc->bd_enc_iov[i].kiov_len;
- if (pt_nob + desc->bd_enc_iov[i].kiov_len >desc->bd_nob)
- desc->bd_iov[i].kiov_len = desc->bd_nob -pt_nob;
- } else {
- /* this should be guaranteed by LNET */
- LASSERT(ct_nob + desc->bd_enc_iov[i].kiov_len <=
- desc->bd_nob_transferred);
- LASSERT(desc->bd_iov[i].kiov_len <=
- desc->bd_enc_iov[i].kiov_len);
- }
+ if (adj_nob) {
+ if (ct_nob + BD_GET_ENC_KIOV(desc, i).kiov_len >
+ desc->bd_nob_transferred)
+ BD_GET_ENC_KIOV(desc, i).kiov_len =
+ desc->bd_nob_transferred - ct_nob;
+
+ BD_GET_KIOV(desc, i).kiov_len =
+ BD_GET_ENC_KIOV(desc, i).kiov_len;
+ if (pt_nob + BD_GET_ENC_KIOV(desc, i).kiov_len >
+ desc->bd_nob)
+ BD_GET_KIOV(desc, i).kiov_len =
+ desc->bd_nob - pt_nob;
+ } else {
+ /* this should be guaranteed by LNET */
+ LASSERT(ct_nob + BD_GET_ENC_KIOV(desc, i).
+ kiov_len <=
+ desc->bd_nob_transferred);
+ LASSERT(BD_GET_KIOV(desc, i).kiov_len <=
+ BD_GET_ENC_KIOV(desc, i).kiov_len);
+ }
- if (desc->bd_enc_iov[i].kiov_len == 0)
- continue;
+ if (BD_GET_ENC_KIOV(desc, i).kiov_len == 0)
+ continue;
sg_init_table(&src, 1);
- sg_set_page(&src, desc->bd_enc_iov[i].kiov_page,
- desc->bd_enc_iov[i].kiov_len,
- desc->bd_enc_iov[i].kiov_offset);
+ sg_set_page(&src, BD_GET_ENC_KIOV(desc, i).kiov_page,
+ BD_GET_ENC_KIOV(desc, i).kiov_len,
+ BD_GET_ENC_KIOV(desc, i).kiov_offset);
dst = src;
- if (desc->bd_iov[i].kiov_len % blocksize == 0)
- sg_assign_page(&dst, desc->bd_iov[i].kiov_page);
+ if (BD_GET_KIOV(desc, i).kiov_len % blocksize == 0)
+ sg_assign_page(&dst,
+ BD_GET_KIOV(desc, i).kiov_page);
rc = crypto_blkcipher_decrypt_iv(&ciph_desc, &dst, &src,
src.length);
return rc;
}
- if (desc->bd_iov[i].kiov_len % blocksize != 0) {
- memcpy(page_address(desc->bd_iov[i].kiov_page) +
- desc->bd_iov[i].kiov_offset,
- page_address(desc->bd_enc_iov[i].kiov_page) +
- desc->bd_iov[i].kiov_offset,
- desc->bd_iov[i].kiov_len);
- }
+ if (BD_GET_KIOV(desc, i).kiov_len % blocksize != 0) {
+ memcpy(page_address(BD_GET_KIOV(desc, i).kiov_page) +
+ BD_GET_KIOV(desc, i).kiov_offset,
+ page_address(BD_GET_ENC_KIOV(desc, i).
+ kiov_page) +
+ BD_GET_KIOV(desc, i).kiov_offset,
+ BD_GET_KIOV(desc, i).kiov_len);
+ }
- ct_nob += desc->bd_enc_iov[i].kiov_len;
- pt_nob += desc->bd_iov[i].kiov_len;
- }
+ ct_nob += BD_GET_ENC_KIOV(desc, i).kiov_len;
+ pt_nob += BD_GET_KIOV(desc, i).kiov_len;
+ }
if (unlikely(ct_nob != desc->bd_nob_transferred)) {
CERROR("%d cipher text transferred but only %d decrypted\n",
return -EFAULT;
}
- /* if needed, clear up the rest unused iovs */
- if (adj_nob)
- while (i < desc->bd_iov_count)
- desc->bd_iov[i++].kiov_len = 0;
+ /* if needed, clear up the rest unused iovs */
+ if (adj_nob)
+ while (i < desc->bd_iov_count)
+ BD_GET_KIOV(desc, i++).kiov_len = 0;
/* decrypt tail (krb5 header) */
buf_to_sg(&src, cipher->data + blocksize, sizeof(*khdr));
static
__u32 gss_prep_bulk_kerberos(struct gss_ctx *gctx,
- struct ptlrpc_bulk_desc *desc)
+ struct ptlrpc_bulk_desc *desc)
{
- struct krb5_ctx *kctx = gctx->internal_ctx_id;
- int blocksize, i;
+ struct krb5_ctx *kctx = gctx->internal_ctx_id;
+ int blocksize, i;
- LASSERT(desc->bd_iov_count);
- LASSERT(desc->bd_enc_iov);
- LASSERT(kctx->kc_keye.kb_tfm);
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+ LASSERT(desc->bd_iov_count);
+ LASSERT(GET_ENC_KIOV(desc));
+ LASSERT(kctx->kc_keye.kb_tfm);
blocksize = crypto_blkcipher_blocksize(kctx->kc_keye.kb_tfm);
- for (i = 0; i < desc->bd_iov_count; i++) {
- LASSERT(desc->bd_enc_iov[i].kiov_page);
- /*
- * offset should always start at page boundary of either
- * client or server side.
- */
- if (desc->bd_iov[i].kiov_offset & blocksize) {
- CERROR("odd offset %d in page %d\n",
- desc->bd_iov[i].kiov_offset, i);
- return GSS_S_FAILURE;
- }
+ for (i = 0; i < desc->bd_iov_count; i++) {
+ LASSERT(BD_GET_ENC_KIOV(desc, i).kiov_page);
+ /*
+ * offset should always start at page boundary of either
+ * client or server side.
+ */
+ if (BD_GET_KIOV(desc, i).kiov_offset & blocksize) {
+ CERROR("odd offset %d in page %d\n",
+ BD_GET_KIOV(desc, i).kiov_offset, i);
+ return GSS_S_FAILURE;
+ }
- desc->bd_enc_iov[i].kiov_offset = desc->bd_iov[i].kiov_offset;
- desc->bd_enc_iov[i].kiov_len = (desc->bd_iov[i].kiov_len +
- blocksize - 1) & (~(blocksize - 1));
- }
+ BD_GET_ENC_KIOV(desc, i).kiov_offset =
+ BD_GET_KIOV(desc, i).kiov_offset;
+ BD_GET_ENC_KIOV(desc, i).kiov_len =
+ (BD_GET_KIOV(desc, i).kiov_len +
+ blocksize - 1) & (~(blocksize - 1));
+ }
- return GSS_S_COMPLETE;
+ return GSS_S_COMPLETE;
}
static
__u8 conf[GSS_MAX_CIPHER_BLOCK];
int rc = 0;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
LASSERT(ke);
LASSERT(ke->ke_conf_size <= GSS_MAX_CIPHER_BLOCK);
data_desc[0].data = conf;
data_desc[0].len = ke->ke_conf_size;
- /* compute checksum */
- if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
- khdr, 1, data_desc,
- desc->bd_iov_count, desc->bd_iov,
- &cksum))
- return GSS_S_FAILURE;
- LASSERT(cksum.len >= ke->ke_hash_size);
+ /* compute checksum */
+ if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
+ khdr, 1, data_desc,
+ desc->bd_iov_count, GET_KIOV(desc),
+ &cksum))
+ return GSS_S_FAILURE;
+ LASSERT(cksum.len >= ke->ke_hash_size);
/*
* clear text layout for encryption:
int rc;
__u32 major;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
LASSERT(ke);
if (token->len < sizeof(*khdr)) {
data_desc[0].data = plain.data;
data_desc[0].len = blocksize;
- if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
- khdr, 1, data_desc,
- desc->bd_iov_count, desc->bd_iov,
- &cksum))
- return GSS_S_FAILURE;
- LASSERT(cksum.len >= ke->ke_hash_size);
+ if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
+ khdr, 1, data_desc,
+ desc->bd_iov_count,
+ GET_KIOV(desc),
+ &cksum))
+ return GSS_S_FAILURE;
+ LASSERT(cksum.len >= ke->ke_hash_size);
if (memcmp(plain.data + blocksize + sizeof(*khdr),
cksum.data + cksum.len - ke->ke_hash_size,
#ifdef HAVE_SERVER_SUPPORT
/**
* Prepare bulk descriptor for specified incoming request \a req that
- * can fit \a npages * pages. \a type is bulk type. \a portal is where
+ * can fit \a nfrags * pages. \a type is bulk type. \a portal is where
* the bulk to be sent. Used on server-side after request was already
* received.
* Returns pointer to newly allocatrd initialized bulk descriptor or NULL on
* error.
*/
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
- unsigned npages, unsigned max_brw,
- unsigned type, unsigned portal)
+ unsigned nfrags, unsigned max_brw,
+ unsigned int type,
+ unsigned portal,
+ const struct ptlrpc_bulk_frag_ops
+ *ops)
{
struct obd_export *exp = req->rq_export;
struct ptlrpc_bulk_desc *desc;
ENTRY;
- LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
+ LASSERT(ptlrpc_is_bulk_op_active(type));
- desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+ desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops);
if (desc == NULL)
RETURN(NULL);
/* NB no locking required until desc is on the network */
LASSERT(desc->bd_md_count == 0);
- LASSERT(desc->bd_type == BULK_PUT_SOURCE ||
- desc->bd_type == BULK_GET_SINK);
+ LASSERT(ptlrpc_is_bulk_op_active(desc->bd_type));
LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback);
LASSERT(desc->bd_cbid.cbid_arg == desc);
}
/* Network is about to get at the memory */
- if (desc->bd_type == BULK_PUT_SOURCE)
+ if (ptlrpc_is_bulk_put_source(desc->bd_type))
rc = LNetPut(conn->c_self, desc->bd_mds[posted_md],
LNET_ACK_REQ, conn->c_peer,
desc->bd_portal, xid, 0, 0);
LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
LASSERT(desc->bd_req != NULL);
- LASSERT(desc->bd_type == BULK_PUT_SINK ||
- desc->bd_type == BULK_GET_SOURCE);
+ LASSERT(ptlrpc_is_bulk_op_passive(desc->bd_type));
/* cleanup the state of the bulk for it will be reused */
if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY)
for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
md.options = PTLRPC_MD_OPTIONS |
- ((desc->bd_type == BULK_GET_SOURCE) ?
+ (ptlrpc_is_bulk_op_get(desc->bd_type) ?
LNET_MD_OP_GET : LNET_MD_OP_PUT);
ptlrpc_fill_bulk_md(&md, desc, posted_md);
CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
"xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
- desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
+ ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink",
desc->bd_iov_count, desc->bd_nob,
desc->bd_last_xid, req->rq_xid, desc->bd_portal);
LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV |
LNET_MD_PHYS)));
- md->options |= LNET_MD_KIOV;
md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV);
md->length = min_t(unsigned int, LNET_MAX_IOV, md->length);
- if (desc->bd_enc_iov)
- md->start = &desc->bd_enc_iov[mdidx * LNET_MAX_IOV];
- else
- md->start = &desc->bd_iov[mdidx * LNET_MAX_IOV];
-}
-
-void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page,
- int pageoffset, int len)
-{
- lnet_kiov_t *kiov = &desc->bd_iov[desc->bd_iov_count];
- kiov->kiov_page = page;
- kiov->kiov_offset = pageoffset;
- kiov->kiov_len = len;
-
- desc->bd_iov_count++;
+ if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) {
+ md->options |= LNET_MD_KIOV;
+ if (GET_ENC_KIOV(desc))
+ md->start = &BD_GET_ENC_KIOV(desc, mdidx *
+ LNET_MAX_IOV);
+ else
+ md->start = &BD_GET_KIOV(desc, mdidx * LNET_MAX_IOV);
+ } else if (ptlrpc_is_bulk_desc_kvec(desc->bd_type)) {
+ md->options |= LNET_MD_IOVEC;
+ if (GET_ENC_KVEC(desc))
+ md->start = &BD_GET_ENC_KVEC(desc, mdidx *
+ LNET_MAX_IOV);
+ else
+ md->start = &BD_GET_KVEC(desc, mdidx * LNET_MAX_IOV);
+ }
}
+
void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
unsigned int service_time);
struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
- unsigned type, unsigned portal);
+ enum ptlrpc_bulk_op_type type,
+ unsigned portal,
+ const struct ptlrpc_bulk_frag_ops
+ *ops);
int ptlrpc_request_cache_init(void);
void ptlrpc_request_cache_fini(void);
struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags);
/* pers.c */
void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
int mdcnt);
-void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page,
- int pageoffset, int len);
/* pack_generic.c */
struct ptlrpc_reply_state *
int p_idx, g_idx;
int i;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
LASSERT(desc->bd_iov_count > 0);
LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
/* resent bulk, enc iov might have been allocated previously */
- if (desc->bd_enc_iov != NULL)
+ if (GET_ENC_KIOV(desc) != NULL)
return 0;
- OBD_ALLOC(desc->bd_enc_iov,
- desc->bd_iov_count * sizeof(*desc->bd_enc_iov));
- if (desc->bd_enc_iov == NULL)
+ OBD_ALLOC(GET_ENC_KIOV(desc),
+ desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc)));
+ if (GET_ENC_KIOV(desc) == NULL)
return -ENOMEM;
spin_lock(&page_pools.epp_lock);
p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
- for (i = 0; i < desc->bd_iov_count; i++) {
- LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
- desc->bd_enc_iov[i].kiov_page =
- page_pools.epp_pools[p_idx][g_idx];
- page_pools.epp_pools[p_idx][g_idx] = NULL;
-
- if (++g_idx == PAGES_PER_POOL) {
- p_idx++;
- g_idx = 0;
- }
- }
+ for (i = 0; i < desc->bd_iov_count; i++) {
+ LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
+ BD_GET_ENC_KIOV(desc, i).kiov_page =
+ page_pools.epp_pools[p_idx][g_idx];
+ page_pools.epp_pools[p_idx][g_idx] = NULL;
+
+ if (++g_idx == PAGES_PER_POOL) {
+ p_idx++;
+ g_idx = 0;
+ }
+ }
if (page_pools.epp_free_pages < page_pools.epp_st_lowfree)
page_pools.epp_st_lowfree = page_pools.epp_free_pages;
void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
{
- int p_idx, g_idx;
- int i;
+ int p_idx, g_idx;
+ int i;
- if (desc->bd_enc_iov == NULL)
- return;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
- LASSERT(desc->bd_iov_count > 0);
+ if (GET_ENC_KIOV(desc) == NULL)
+ return;
+
+ LASSERT(desc->bd_iov_count > 0);
spin_lock(&page_pools.epp_lock);
- p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
- g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
+ p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
+ g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
- LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <=
- page_pools.epp_total_pages);
- LASSERT(page_pools.epp_pools[p_idx]);
+ LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <=
+ page_pools.epp_total_pages);
+ LASSERT(page_pools.epp_pools[p_idx]);
- for (i = 0; i < desc->bd_iov_count; i++) {
- LASSERT(desc->bd_enc_iov[i].kiov_page != NULL);
- LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
- LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
+ for (i = 0; i < desc->bd_iov_count; i++) {
+ LASSERT(BD_GET_ENC_KIOV(desc, i).kiov_page != NULL);
+ LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
+ LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
- page_pools.epp_pools[p_idx][g_idx] =
- desc->bd_enc_iov[i].kiov_page;
+ page_pools.epp_pools[p_idx][g_idx] =
+ BD_GET_ENC_KIOV(desc, i).kiov_page;
- if (++g_idx == PAGES_PER_POOL) {
- p_idx++;
- g_idx = 0;
- }
- }
+ if (++g_idx == PAGES_PER_POOL) {
+ p_idx++;
+ g_idx = 0;
+ }
+ }
- page_pools.epp_free_pages += desc->bd_iov_count;
+ page_pools.epp_free_pages += desc->bd_iov_count;
- enc_pools_wakeup();
+ enc_pools_wakeup();
spin_unlock(&page_pools.epp_lock);
- OBD_FREE(desc->bd_enc_iov,
- desc->bd_iov_count * sizeof(*desc->bd_enc_iov));
- desc->bd_enc_iov = NULL;
+ OBD_FREE(GET_ENC_KIOV(desc),
+ desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc)));
+ GET_ENC_KIOV(desc) = NULL;
}
/*
unsigned int bufsize;
int i, err;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
LASSERT(alg > BULK_HASH_ALG_NULL && alg < BULK_HASH_ALG_MAX);
LASSERT(buflen >= 4);
hashsize = cfs_crypto_hash_digestsize(cfs_hash_alg_id[alg]);
for (i = 0; i < desc->bd_iov_count; i++) {
- cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
- desc->bd_iov[i].kiov_offset & ~PAGE_MASK,
- desc->bd_iov[i].kiov_len);
+ cfs_crypto_hash_update_page(hdesc,
+ BD_GET_KIOV(desc, i).kiov_page,
+ BD_GET_KIOV(desc, i).kiov_offset &
+ ~PAGE_MASK,
+ BD_GET_KIOV(desc, i).kiov_len);
}
if (hashsize > buflen) {
char *ptr;
unsigned int off, i;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
for (i = 0; i < desc->bd_iov_count; i++) {
- if (desc->bd_iov[i].kiov_len == 0)
+ if (BD_GET_KIOV(desc, i).kiov_len == 0)
continue;
- ptr = kmap(desc->bd_iov[i].kiov_page);
- off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
+ ptr = kmap(BD_GET_KIOV(desc, i).kiov_page);
+ off = BD_GET_KIOV(desc, i).kiov_offset & ~PAGE_MASK;
ptr[off] ^= 0x1;
- kunmap(desc->bd_iov[i].kiov_page);
+ kunmap(BD_GET_KIOV(desc, i).kiov_page);
return;
}
}
int rc;
int i, nob;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
LASSERT(req->rq_pack_bulk);
LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
LASSERT(req->rq_repdata->lm_bufcount == PLAIN_PACK_SEGMENTS);
return 0;
}
- /* fix the actual data size */
- for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
- if (desc->bd_iov[i].kiov_len + nob > desc->bd_nob_transferred) {
- desc->bd_iov[i].kiov_len =
- desc->bd_nob_transferred - nob;
- }
- nob += desc->bd_iov[i].kiov_len;
- }
+ /* fix the actual data size */
+ for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
+ if (BD_GET_KIOV(desc, i).kiov_len +
+ nob > desc->bd_nob_transferred) {
+ BD_GET_KIOV(desc, i).kiov_len =
+ desc->bd_nob_transferred - nob;
+ }
+ nob += BD_GET_KIOV(desc, i).kiov_len;
+ }
rc = plain_verify_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg,
tokenv);
ptlrpc_at_set_req_timeout(req);
/* allocate bulk descriptor */
- desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
- MDS_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+ PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+ MDS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
if (desc == NULL) {
ptlrpc_request_free(req);
RETURN(-ENOMEM);
/* req now owns desc and will free it when it gets freed */
for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+ desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+ PAGE_CACHE_SIZE);
/* pack index information in request */
req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
ENTRY;
- desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
- MDS_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1,
+ PTLRPC_BULK_PUT_SOURCE |
+ PTLRPC_BULK_BUF_KIOV,
+ MDS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
if (desc == NULL)
RETURN(-ENOMEM);
for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
i++, tmpcount -= tmpsize) {
tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
- ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
+ desc->bd_frag_ops->add_kiov_frag(desc, rdpg->rp_pages[i], 0,
+ tmpsize);
}
LASSERT(desc->bd_nob == nob);
rc = target_bulk_io(exp, desc, lwi);
- ptlrpc_free_bulk_pin(desc);
+ ptlrpc_free_bulk(desc);
RETURN(rc);
}
EXPORT_SYMBOL(tgt_sendpage);
unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
__u32 cksum;
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+
hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
if (IS_ERR(hdesc)) {
CERROR("%s: unable to initialize checksum hash %s\n",
* simulate a client->OST data error */
if (i == 0 && opc == OST_WRITE &&
OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
- int off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
- int len = desc->bd_iov[i].kiov_len;
+ int off = BD_GET_KIOV(desc, i).kiov_offset &
+ ~PAGE_MASK;
+ int len = BD_GET_KIOV(desc, i).kiov_len;
struct page *np = tgt_page_to_corrupt;
- char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+ char *ptr = kmap(BD_GET_KIOV(desc, i).kiov_page) + off;
if (np) {
char *ptr2 = kmap(np) + off;
memcpy(ptr2, ptr, len);
memcpy(ptr2, "bad3", min(4, len));
kunmap(np);
- desc->bd_iov[i].kiov_page = np;
+ BD_GET_KIOV(desc, i).kiov_page = np;
} else {
CERROR("%s: can't alloc page for corruption\n",
tgt_name(tgt));
}
}
- cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
- desc->bd_iov[i].kiov_offset & ~PAGE_MASK,
- desc->bd_iov[i].kiov_len);
+ cfs_crypto_hash_update_page(hdesc,
+ BD_GET_KIOV(desc, i).kiov_page,
+ BD_GET_KIOV(desc, i).kiov_offset &
+ ~PAGE_MASK,
+ BD_GET_KIOV(desc, i).kiov_len);
/* corrupt the data after we compute the checksum, to
* simulate an OST->client data error */
if (i == 0 && opc == OST_READ &&
OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
- int off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
- int len = desc->bd_iov[i].kiov_len;
+ int off = BD_GET_KIOV(desc, i).kiov_offset
+ & ~PAGE_MASK;
+ int len = BD_GET_KIOV(desc, i).kiov_len;
struct page *np = tgt_page_to_corrupt;
- char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+ char *ptr =
+ kmap(BD_GET_KIOV(desc, i).kiov_page) + off;
if (np) {
char *ptr2 = kmap(np) + off;
memcpy(ptr2, ptr, len);
memcpy(ptr2, "bad4", min(4, len));
kunmap(np);
- desc->bd_iov[i].kiov_page = np;
+ BD_GET_KIOV(desc, i).kiov_page = np;
} else {
CERROR("%s: can't alloc page for corruption\n",
tgt_name(tgt));
GOTO(out_lock, rc);
desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
- BULK_PUT_SOURCE, OST_BULK_PORTAL);
+ PTLRPC_BULK_PUT_SOURCE |
+ PTLRPC_BULK_BUF_KIOV,
+ OST_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_nopin_ops);
if (desc == NULL)
GOTO(out_commitrw, rc = -ENOMEM);
nob += page_rc;
if (page_rc != 0) { /* some data! */
LASSERT(local_nb[i].lnb_page != NULL);
- ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page,
- local_nb[i].lnb_page_offset,
- page_rc);
+ desc->bd_frag_ops->add_kiov_frag
+ (desc, local_nb[i].lnb_page,
+ local_nb[i].lnb_page_offset,
+ page_rc);
}
if (page_rc != local_nb[i].lnb_len) { /* short read */
tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PR);
if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
- ptlrpc_free_bulk_nopin(desc);
+ ptlrpc_free_bulk(desc);
LASSERT(rc <= 0);
if (rc == 0) {
lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
l_wait_event(waitq, 0, &lwi1);
target_bulk_io(exp, desc, &lwi);
- ptlrpc_free_bulk_nopin(desc);
+ ptlrpc_free_bulk(desc);
}
RETURN(rc);
GOTO(out_lock, rc);
desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
- BULK_GET_SINK, OST_BULK_PORTAL);
+ PTLRPC_BULK_GET_SINK | PTLRPC_BULK_BUF_KIOV,
+ OST_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_nopin_ops);
if (desc == NULL)
GOTO(skip_transfer, rc = -ENOMEM);
/* NB Having prepped, we must commit... */
for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page,
- local_nb[i].lnb_page_offset,
- local_nb[i].lnb_len);
+ desc->bd_frag_ops->add_kiov_frag(desc,
+ local_nb[i].lnb_page,
+ local_nb[i].lnb_page_offset,
+ local_nb[i].lnb_len);
rc = sptlrpc_svc_prep_bulk(req, desc);
if (rc != 0)
out_lock:
tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PW);
if (desc)
- ptlrpc_free_bulk_nopin(desc);
+ ptlrpc_free_bulk(desc);
out:
if (no_reply) {
req->rq_no_reply = 1;