#define PTLRPC_MD_OPTIONS 0
/**
- * Define maxima for bulk I/O
- * CAVEAT EMPTOR, with multinet (i.e. routers forwarding between networks)
- * these limits are system wide and not interface-local. */
-#define PTLRPC_MAX_BRW_BITS LNET_MTU_BITS
-#define PTLRPC_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
-#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
+ * Max # of bulk operations in one request.
+ * In order for the client and server to properly negotiate the maximum
+ * possible transfer size, PTLRPC_BULK_OPS_COUNT must be a power-of-two
+ * value. The client is free to limit the actual RPC size for any bulk
+ * transfer via cl_max_pages_per_rpc to some non-power-of-two value. */
+#define PTLRPC_BULK_OPS_BITS 2
+#define PTLRPC_BULK_OPS_COUNT (1U << PTLRPC_BULK_OPS_BITS)
+/**
+ * PTLRPC_BULK_OPS_MASK is for the convenience of the client only, and
+ * should not be used on the server at all. Otherwise, it imposes a
+ * protocol limitation on the maximum RPC size that can be used by any
+ * RPC sent to that server in the future. Instead, the server should
+ * use the negotiated per-client ocd_brw_size to determine the bulk
+ * RPC count. */
+#define PTLRPC_BULK_OPS_MASK (~((__u64)PTLRPC_BULK_OPS_COUNT - 1))
+
+/**
+ * Define maxima for bulk I/O.
+ *
+ * A single PTLRPC BRW request is sent via up to PTLRPC_BULK_OPS_COUNT
+ * of LNET_MTU sized RDMA transfers. Clients and servers negotiate the
+ * currently supported maximum between peers at connect via ocd_brw_size.
+ */
+#define PTLRPC_MAX_BRW_BITS (LNET_MTU_BITS + PTLRPC_BULK_OPS_BITS)
+#define PTLRPC_MAX_BRW_SIZE (1 << PTLRPC_MAX_BRW_BITS)
+#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
#define ONE_MB_BRW_SIZE (1 << LNET_MTU_BITS)
#define MD_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
#define MD_MAX_BRW_PAGES (MD_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
-#define DT_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
+#define DT_MAX_BRW_SIZE PTLRPC_MAX_BRW_SIZE
+#define DT_MAX_BRW_PAGES (DT_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
#define OFD_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
/* When PAGE_SIZE is a constant, we can check our arithmetic here with cpp! */
# if (PTLRPC_MAX_BRW_SIZE != (PTLRPC_MAX_BRW_PAGES * CFS_PAGE_SIZE))
# error "PTLRPC_MAX_BRW_SIZE isn't PTLRPC_MAX_BRW_PAGES * CFS_PAGE_SIZE"
# endif
-# if (PTLRPC_MAX_BRW_SIZE > LNET_MTU)
+# if (PTLRPC_MAX_BRW_SIZE > LNET_MTU * PTLRPC_BULK_OPS_COUNT)
# error "PTLRPC_MAX_BRW_SIZE too big"
# endif
-# if (PTLRPC_MAX_BRW_PAGES > LNET_MAX_IOV)
+# if (PTLRPC_MAX_BRW_PAGES > LNET_MAX_IOV * PTLRPC_BULK_OPS_COUNT)
# error "PTLRPC_MAX_BRW_PAGES too big"
# endif
#endif /* __KERNEL__ */
#define OSS_CR_NTHRS_BASE 8
#define OSS_CR_NTHRS_MAX 64
-#define OST_NBUFS (64 * cfs_num_online_cpus())
-#define OST_BUFSIZE (8 * 1024)
-
/**
- * OST_MAXREQSIZE ~= 4768 bytes =
- * lustre_msg + obdo + 16 * obd_ioobj + 256 * niobuf_remote
+ * OST_MAXREQSIZE ~=
+ * lustre_msg + obdo + obd_ioobj + DT_MAX_BRW_PAGES * niobuf_remote
*
* - single object with 16 pages is 512 bytes
* - OST_MAXREQSIZE must be at least 1 page of cookies plus some spillover
+ * - Must be a multiple of 1024
*/
-#define OST_MAXREQSIZE (5 * 1024)
+#define _OST_MAXREQSIZE_SUM (sizeof(struct lustre_msg) + sizeof(struct obdo) + \
+ sizeof(struct obd_ioobj) + DT_MAX_BRW_PAGES * \
+ sizeof(struct niobuf_remote))
+#define OST_MAXREQSIZE (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1)
+
#define OST_MAXREPSIZE (9 * 1024)
+#define OST_NBUFS (64 * cfs_num_online_cpus())
+#define OST_BUFSIZE (OST_MAXREQSIZE + 1024)
+
/* Macro to hide a typecast. */
#define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
#define BULK_PUT_SOURCE 3
/**
- * Definition of buk descriptor.
+ * Definition of bulk descriptor.
* Bulks are special "Two phase" RPCs where initial request message
* is sent first and it is followed bt a transfer (o receiving) of a large
* amount of data to be settled into pages referenced from the bulk descriptors.
* Another user is readpage for MDT.
*/
struct ptlrpc_bulk_desc {
- /** completed successfully */
- unsigned long bd_success:1;
- /** accessible to the network (network io potentially in progress) */
- unsigned long bd_network_rw:1;
- /** {put,get}{source,sink} */
- unsigned long bd_type:2;
- /** client side */
- unsigned long bd_registered:1;
- /** For serialization with callback */
+ /** completed with failure */
+ unsigned long bd_failure:1;
+ /** {put,get}{source,sink} */
+ unsigned long bd_type:2;
+ /** client side */
+ unsigned long bd_registered:1;
+ /** For serialization with callback */
spinlock_t bd_lock;
- /** Import generation when request for this bulk was sent */
- int bd_import_generation;
- /** Server side - export this bulk created for */
- struct obd_export *bd_export;
- /** Client side - import this bulk was sent on */
- struct obd_import *bd_import;
- /** LNet portal for this bulk */
- __u32 bd_portal;
- /** Back pointer to the request */
- struct ptlrpc_request *bd_req;
- cfs_waitq_t bd_waitq; /* server side only WQ */
- int bd_iov_count; /* # entries in bd_iov */
- int bd_max_iov; /* allocated size of bd_iov */
- int bd_nob; /* # bytes covered */
- int bd_nob_transferred; /* # bytes GOT/PUT */
-
- __u64 bd_last_xid;
-
- struct ptlrpc_cb_id bd_cbid; /* network callback info */
- lnet_handle_md_t bd_md_h; /* associated MD */
- lnet_nid_t bd_sender; /* stash event::sender */
+ /** Import generation when request for this bulk was sent */
+ int bd_import_generation;
+ /** LNet portal for this bulk */
+ __u32 bd_portal;
+ /** Server side - export this bulk created for */
+ struct obd_export *bd_export;
+ /** Client side - import this bulk was sent on */
+ struct obd_import *bd_import;
+ /** Back pointer to the request */
+ struct ptlrpc_request *bd_req;
+ cfs_waitq_t bd_waitq; /* server side only WQ */
+ int bd_iov_count; /* # entries in bd_iov */
+ int bd_max_iov; /* allocated size of bd_iov */
+ int bd_nob; /* # bytes covered */
+ int bd_nob_transferred; /* # bytes GOT/PUT */
+
+ __u64 bd_last_xid;
+
+ struct ptlrpc_cb_id bd_cbid; /* network callback info */
+ lnet_nid_t bd_sender; /* stash event::sender */
+ int bd_md_count; /* # valid entries in bd_mds */
+ int bd_md_max_brw; /* max entries in bd_mds */
+ /** array of associated MDs */
+ lnet_handle_md_t bd_mds[PTLRPC_BULK_OPS_COUNT];
#if defined(__KERNEL__)
- /*
- * encrypt iov, size is either 0 or bd_iov_count.
- */
- lnet_kiov_t *bd_enc_iov;
+ /*
+ * encrypt iov, size is either 0 or bd_iov_count.
+ */
+ lnet_kiov_t *bd_enc_iov;
- lnet_kiov_t bd_iov[0];
+ lnet_kiov_t bd_iov[0];
#else
- lnet_md_iovec_t bd_iov[0];
+ lnet_md_iovec_t bd_iov[0];
#endif
};
*/
#ifdef HAVE_SERVER_SUPPORT
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
- int npages, int type, int portal);
+ unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal);
int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
LASSERT(desc != NULL);
spin_lock(&desc->bd_lock);
- rc = desc->bd_network_rw;
+ rc = desc->bd_md_count;
spin_unlock(&desc->bd_lock);
return rc;
}
return 0;
spin_lock(&desc->bd_lock);
- rc = desc->bd_network_rw;
+ rc = desc->bd_md_count;
spin_unlock(&desc->bd_lock);
return rc;
}
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
- int npages, int type, int portal);
+ unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal);
void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin);
static inline void ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc *bulk)
{