EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
/**
- * Allocate and initialize new bulk descriptor
+ * Allocate and initialize new bulk descriptor on the sender.
* Returns pointer to the descriptor or NULL on error.
*/
-struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal)
{
- struct ptlrpc_bulk_desc *desc;
+ struct ptlrpc_bulk_desc *desc;
+ int i;
- OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
- if (!desc)
- return NULL;
+ OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]));
+ if (!desc)
+ return NULL;
spin_lock_init(&desc->bd_lock);
- cfs_waitq_init(&desc->bd_waitq);
- desc->bd_max_iov = npages;
- desc->bd_iov_count = 0;
- LNetInvalidateHandle(&desc->bd_md_h);
- desc->bd_portal = portal;
- desc->bd_type = type;
-
- return desc;
+ cfs_waitq_init(&desc->bd_waitq);
+ desc->bd_max_iov = npages;
+ desc->bd_iov_count = 0;
+ desc->bd_portal = portal;
+ desc->bd_type = type;
+ desc->bd_md_count = 0;
+ LASSERT(max_brw > 0);
+ desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
+ /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
+ * node. Negotiated ocd_brw_size will always be <= this number. */
+ for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
+ LNetInvalidateHandle(&desc->bd_mds[i]);
+
+ return desc;
}
/**
* error.
*/
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
- int npages, int type, int portal)
+ unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal)
{
- struct obd_import *imp = req->rq_import;
- struct ptlrpc_bulk_desc *desc;
+ struct obd_import *imp = req->rq_import;
+ struct ptlrpc_bulk_desc *desc;
- ENTRY;
- LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
- desc = new_bulk(npages, type, portal);
- if (desc == NULL)
- RETURN(NULL);
+ ENTRY;
+ LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
+ desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+ if (desc == NULL)
+ RETURN(NULL);
desc->bd_import_generation = req->rq_import_generation;
desc->bd_import = class_import_get(imp);
*/
void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
{
- int i;
- ENTRY;
+ int i;
+ ENTRY;
- LASSERT(desc != NULL);
- LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
- LASSERT(!desc->bd_network_rw); /* network hands off or */
- LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
+ LASSERT(desc != NULL);
+ LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
+ LASSERT(desc->bd_md_count == 0); /* network hands off */
+ LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
- sptlrpc_enc_pool_put_pages(desc);
+ sptlrpc_enc_pool_put_pages(desc);
- if (desc->bd_export)
- class_export_put(desc->bd_export);
- else
- class_import_put(desc->bd_import);
+ if (desc->bd_export)
+ class_export_put(desc->bd_export);
+ else
+ class_import_put(desc->bd_import);
if (unpin) {
for (i = 0; i < desc->bd_iov_count ; i++)
cfs_page_unpin(desc->bd_iov[i].kiov_page);
}
- OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
- bd_iov[desc->bd_max_iov]));
- EXIT;
+ OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+ bd_iov[desc->bd_max_iov]));
+ EXIT;
}
EXPORT_SYMBOL(__ptlrpc_free_bulk);
if (ptlrpc_client_bulk_active(req))
continue;
- if (!req->rq_bulk->bd_success) {
- /* The RPC reply arrived OK, but the bulk screwed
- * up! Dead weird since the server told us the RPC
- * was good after getting the REPLY for her GET or
- * the ACK for her PUT. */
- DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
- req->rq_status = -EIO;
- }
+ if (req->rq_bulk->bd_failure) {
+ /* The RPC reply arrived OK, but the bulk screwed
+ * up! Dead weird since the server told us the RPC
+ * was good after getting the REPLY for her GET or
+ * the ACK for her PUT. */
+ DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+ req->rq_status = -EIO;
+ }
ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
#define YEAR_2004 (1ULL << 30)
void ptlrpc_init_xid(void)
{
- time_t now = cfs_time_current_sec();
+ time_t now = cfs_time_current_sec();
spin_lock_init(&ptlrpc_last_xid_lock);
- if (now < YEAR_2004) {
- cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
- ptlrpc_last_xid >>= 2;
- ptlrpc_last_xid |= (1ULL << 61);
- } else {
- ptlrpc_last_xid = (__u64)now << 20;
- }
+ if (now < YEAR_2004) {
+ cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
+ ptlrpc_last_xid >>= 2;
+ ptlrpc_last_xid |= (1ULL << 61);
+ } else {
+ ptlrpc_last_xid = (__u64)now << 20;
+ }
+
+ /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
+ CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
+ ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
}
/**
- * Increase xid and returns resultng new value to the caller.
+ * Increase xid and returns resulting new value to the caller.
+ *
+ * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
+ * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
+ * itself uses the last bulk xid needed, so the server can determine the
+ * the number of bulk transfers from the RPC XID and a bitmask. The starting
+ * xid must align to a power-of-two value.
+ *
+ * This is assumed to be true due to the initial ptlrpc_last_xid
+ * value also being initialized to a power-of-two value. LU-1431
*/
__u64 ptlrpc_next_xid(void)
{
- __u64 tmp;
+ __u64 next;
+
spin_lock(&ptlrpc_last_xid_lock);
- tmp = ++ptlrpc_last_xid;
+ next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
+ ptlrpc_last_xid = next;
spin_unlock(&ptlrpc_last_xid_lock);
- return tmp;
+
+ return next;
}
EXPORT_SYMBOL(ptlrpc_next_xid);
{
#if BITS_PER_LONG == 32
/* need to avoid possible word tearing on 32-bit systems */
- __u64 tmp;
+ __u64 next;
+
spin_lock(&ptlrpc_last_xid_lock);
- tmp = ptlrpc_last_xid + 1;
+ next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
spin_unlock(&ptlrpc_last_xid_lock);
- return tmp;
+
+ return next;
#else
/* No need to lock, since returned value is racy anyways */
- return ptlrpc_last_xid + 1;
+ return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
#endif
}
EXPORT_SYMBOL(ptlrpc_sample_next_xid);