lnet_handle_eq_t eq_handle;
} lnet_md_t;
-/* Max Transfer Unit (minimum supported everywhere) */
-#define LNET_MTU_BITS 20
-#define LNET_MTU (1<<LNET_MTU_BITS)
+/* Max Transfer Unit (minimum supported everywhere).
+ * CAVEAT EMPTOR, with multinet (i.e. routers forwarding between networks)
+ * these limits are system wide and not interface-local. */
+#define LNET_MTU_BITS 20
+#define LNET_MTU (1 << LNET_MTU_BITS)
/** limit on the number of fragments in discontiguous MDs */
#define LNET_MAX_IOV 256
static int hf_lustre_lustre_msg_v2_lm_magic = -1;
static int hf_lustre_lov_mds_md_v1_lmm_object_id = -1;
static int hf_lustre_ptlrpc_body_pb_last_seen = -1;
-static int hf_lustre_obd_ioobj_ioo_type = -1;
+static int hf_lustre_obd_ioobj_ioo_max_brw = -1;
static int hf_lustre_ptlrpc_body_pb_last_xid = -1;
static int hf_lustre_ptlrpc_body_pb_status = -1;
static int hf_lustre_niobuf_remote_flags = -1;
/* IDL: struct obd_ioobj { */
/* IDL: uint64 ioo_id; */
/* IDL: uint64 ioo_seq; */
-/* IDL: uint32 ioo_type; */
+/* IDL: uint32 ioo_max_brw; */
/* IDL: uint32 ioo_bufcnt; */
/* IDL: } */
}
static int
-lustre_dissect_element_obd_ioobj_ioo_type(tvbuff_t *tvb _U_, int offset _U_, packet_info *pinfo _U_, proto_tree *tree _U_)
+lustre_dissect_element_obd_ioobj_ioo_max_brw(tvbuff_t *tvb _U_, int offset _U_, packet_info *pinfo _U_, proto_tree *tree _U_)
{
- offset=dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_obd_ioobj_ioo_type);
+ offset=dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_obd_ioobj_ioo_max_brw);
return offset;
}
offset=lustre_dissect_element_obd_ioobj_ioo_seq(tvb, offset, pinfo, tree);
- offset=lustre_dissect_element_obd_ioobj_ioo_type(tvb, offset, pinfo, tree);
+ offset=lustre_dissect_element_obd_ioobj_ioo_max_brw(tvb, offset, pinfo, tree);
offset=lustre_dissect_element_obd_ioobj_ioo_bufcnt(tvb, offset, pinfo, tree);
{ "Lmm Object Id", "lustre.lov_mds_md_v1.lmm_object_id", FT_UINT64, BASE_DEC, NULL, 0, "", HFILL }},
{ &hf_lustre_ptlrpc_body_pb_last_seen,
{ "Pb Last Seen", "lustre.ptlrpc_body.pb_last_seen", FT_UINT64, BASE_DEC, NULL, 0, "", HFILL }},
- { &hf_lustre_obd_ioobj_ioo_type, /* TODO : create the
+ { &hf_lustre_obd_ioobj_ioo_max_brw, /* TODO : create the
corresponding value_string */
- { "Ioo Type", "lustre.obd_ioobj.ioo_type", FT_UINT32, BASE_HEX, NULL, 0, "", HFILL } },
+ { "Ioo Max BRW Size", "lustre.obd_ioobj.ioo_max_brw", FT_UINT32, BASE_HEX, NULL, 0, "", HFILL } },
{ &hf_lustre_ptlrpc_body_pb_last_xid,
{ "Pb Last Xid", "lustre.ptlrpc_body.pb_last_xid", FT_UINT64, BASE_DEC, NULL, 0, "", HFILL }},
{ &hf_lustre_ptlrpc_body_pb_status,
* If we eventually have separate connect data for different types, which we
* almost certainly will, then perhaps we stick a union in here. */
struct obd_connect_data_v1 {
- __u64 ocd_connect_flags; /* OBD_CONNECT_* per above */
- __u32 ocd_version; /* lustre release version number */
- __u32 ocd_grant; /* initial cache grant amount (bytes) */
- __u32 ocd_index; /* LOV index to connect to */
- __u32 ocd_brw_size; /* Maximum BRW size in bytes */
+ __u64 ocd_connect_flags; /* OBD_CONNECT_* per above */
+ __u32 ocd_version; /* lustre release version number */
+ __u32 ocd_grant; /* initial cache grant amount (bytes) */
+ __u32 ocd_index; /* LOV index to connect to */
+ __u32 ocd_brw_size; /* Maximum BRW size in bytes, must be 2^n */
__u64 ocd_ibits_known; /* inode bits this client understands */
__u8 ocd_blocksize; /* log2 of the backend filesystem blocksize */
__u8 ocd_inodespace; /* log2 of the per-inode space consumption */
};
struct obd_connect_data {
- __u64 ocd_connect_flags; /* OBD_CONNECT_* per above */
- __u32 ocd_version; /* lustre release version number */
- __u32 ocd_grant; /* initial cache grant amount (bytes) */
- __u32 ocd_index; /* LOV index to connect to */
- __u32 ocd_brw_size; /* Maximum BRW size in bytes */
+ __u64 ocd_connect_flags; /* OBD_CONNECT_* per above */
+ __u32 ocd_version; /* lustre release version number */
+ __u32 ocd_grant; /* initial cache grant amount (bytes) */
+ __u32 ocd_index; /* LOV index to connect to */
+ __u32 ocd_brw_size; /* Maximum BRW size in bytes */
__u64 ocd_ibits_known; /* inode bits this client understands */
__u8 ocd_blocksize; /* log2 of the backend filesystem blocksize */
__u8 ocd_inodespace; /* log2 of the per-inode space consumption */
#define OST_MAX_PRECREATE 20000
struct obd_ioobj {
- struct ost_id ioo_oid;
- __u32 ioo_type;
- __u32 ioo_bufcnt;
-};
+ struct ost_id ioo_oid; /* object ID, if multi-obj BRW */
+ __u32 ioo_max_brw; /* low 16 bits were o_mode before 2.4,
+ * now (PTLRPC_BULK_OPS_COUNT - 1) in
+ * high 16 bits in 2.4 and later */
+ __u32 ioo_bufcnt; /* number of niobufs for this object */
+};
+
+#define IOOBJ_MAX_BRW_BITS 16
+#define IOOBJ_TYPE_MASK ((1U << IOOBJ_MAX_BRW_BITS) - 1)
+#define ioobj_max_brw_get(ioo) (((ioo)->ioo_max_brw >> IOOBJ_MAX_BRW_BITS) + 1)
+#define ioobj_max_brw_set(ioo, num) \
+do { (ioo)->ioo_max_brw = ((num) - 1) << IOOBJ_MAX_BRW_BITS; } while (0)
#define ioo_id ioo_oid.oi_id
#define ioo_seq ioo_oid.oi_seq
return *exp_connect_flags_ptr(exp);
}
-static inline int exp_brw_size(struct obd_export *exp)
+static inline int exp_max_brw_size(struct obd_export *exp)
{
LASSERT(exp != NULL);
if (exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE)
return ONE_MB_BRW_SIZE;
}
+static inline int exp_connect_multibulk(struct obd_export *exp)
+{
+ return exp_max_brw_size(exp) > ONE_MB_BRW_SIZE;
+}
+
static inline int exp_expired(struct obd_export *exp, cfs_duration_t age)
{
LASSERT(exp->exp_delayed);
#define PTLRPC_MD_OPTIONS 0
/**
- * Define maxima for bulk I/O
- * CAVEAT EMPTOR, with multinet (i.e. routers forwarding between networks)
- * these limits are system wide and not interface-local. */
-#define PTLRPC_MAX_BRW_BITS LNET_MTU_BITS
-#define PTLRPC_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
-#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
+ * Max # of bulk operations in one request.
+ * In order for the client and server to properly negotiate the maximum
+ * possible transfer size, PTLRPC_BULK_OPS_COUNT must be a power-of-two
+ * value. The client is free to limit the actual RPC size for any bulk
+ * transfer via cl_max_pages_per_rpc to some non-power-of-two value. */
+#define PTLRPC_BULK_OPS_BITS 2
+#define PTLRPC_BULK_OPS_COUNT (1U << PTLRPC_BULK_OPS_BITS)
+/**
+ * PTLRPC_BULK_OPS_MASK is for the convenience of the client only, and
+ * should not be used on the server at all. Otherwise, it imposes a
+ * protocol limitation on the maximum RPC size that can be used by any
+ * RPC sent to that server in the future. Instead, the server should
+ * use the negotiated per-client ocd_brw_size to determine the bulk
+ * RPC count. */
+#define PTLRPC_BULK_OPS_MASK (~((__u64)PTLRPC_BULK_OPS_COUNT - 1))
+
+/**
+ * Define maxima for bulk I/O.
+ *
+ * A single PTLRPC BRW request is sent via up to PTLRPC_BULK_OPS_COUNT
+ * of LNET_MTU sized RDMA transfers. Clients and servers negotiate the
+ * currently supported maximum between peers at connect via ocd_brw_size.
+ */
+#define PTLRPC_MAX_BRW_BITS (LNET_MTU_BITS + PTLRPC_BULK_OPS_BITS)
+#define PTLRPC_MAX_BRW_SIZE (1 << PTLRPC_MAX_BRW_BITS)
+#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
#define ONE_MB_BRW_SIZE (1 << LNET_MTU_BITS)
#define MD_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
#define MD_MAX_BRW_PAGES (MD_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
-#define DT_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
+#define DT_MAX_BRW_SIZE PTLRPC_MAX_BRW_SIZE
+#define DT_MAX_BRW_PAGES (DT_MAX_BRW_SIZE >> CFS_PAGE_SHIFT)
#define OFD_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
/* When PAGE_SIZE is a constant, we can check our arithmetic here with cpp! */
# if (PTLRPC_MAX_BRW_SIZE != (PTLRPC_MAX_BRW_PAGES * CFS_PAGE_SIZE))
# error "PTLRPC_MAX_BRW_SIZE isn't PTLRPC_MAX_BRW_PAGES * CFS_PAGE_SIZE"
# endif
-# if (PTLRPC_MAX_BRW_SIZE > LNET_MTU)
+# if (PTLRPC_MAX_BRW_SIZE > LNET_MTU * PTLRPC_BULK_OPS_COUNT)
# error "PTLRPC_MAX_BRW_SIZE too big"
# endif
-# if (PTLRPC_MAX_BRW_PAGES > LNET_MAX_IOV)
+# if (PTLRPC_MAX_BRW_PAGES > LNET_MAX_IOV * PTLRPC_BULK_OPS_COUNT)
# error "PTLRPC_MAX_BRW_PAGES too big"
# endif
#endif /* __KERNEL__ */
#define OSS_CR_NTHRS_BASE 8
#define OSS_CR_NTHRS_MAX 64
-#define OST_NBUFS (64 * cfs_num_online_cpus())
-#define OST_BUFSIZE (8 * 1024)
-
/**
- * OST_MAXREQSIZE ~= 4768 bytes =
- * lustre_msg + obdo + 16 * obd_ioobj + 256 * niobuf_remote
+ * OST_MAXREQSIZE ~=
+ * lustre_msg + obdo + obd_ioobj + DT_MAX_BRW_PAGES * niobuf_remote
*
* - single object with 16 pages is 512 bytes
* - OST_MAXREQSIZE must be at least 1 page of cookies plus some spillover
+ * - Must be a multiple of 1024
*/
-#define OST_MAXREQSIZE (5 * 1024)
+#define _OST_MAXREQSIZE_SUM (sizeof(struct lustre_msg) + sizeof(struct obdo) + \
+ sizeof(struct obd_ioobj) + DT_MAX_BRW_PAGES * \
+ sizeof(struct niobuf_remote))
+#define OST_MAXREQSIZE (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1)
+
#define OST_MAXREPSIZE (9 * 1024)
+#define OST_NBUFS (64 * cfs_num_online_cpus())
+#define OST_BUFSIZE (OST_MAXREQSIZE + 1024)
+
/* Macro to hide a typecast. */
#define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
#define BULK_PUT_SOURCE 3
/**
- * Definition of buk descriptor.
+ * Definition of bulk descriptor.
* Bulks are special "Two phase" RPCs where initial request message
* is sent first and it is followed bt a transfer (o receiving) of a large
* amount of data to be settled into pages referenced from the bulk descriptors.
* Another user is readpage for MDT.
*/
struct ptlrpc_bulk_desc {
- /** completed successfully */
- unsigned long bd_success:1;
- /** accessible to the network (network io potentially in progress) */
- unsigned long bd_network_rw:1;
- /** {put,get}{source,sink} */
- unsigned long bd_type:2;
- /** client side */
- unsigned long bd_registered:1;
- /** For serialization with callback */
+ /** completed with failure */
+ unsigned long bd_failure:1;
+ /** {put,get}{source,sink} */
+ unsigned long bd_type:2;
+ /** client side */
+ unsigned long bd_registered:1;
+ /** For serialization with callback */
spinlock_t bd_lock;
- /** Import generation when request for this bulk was sent */
- int bd_import_generation;
- /** Server side - export this bulk created for */
- struct obd_export *bd_export;
- /** Client side - import this bulk was sent on */
- struct obd_import *bd_import;
- /** LNet portal for this bulk */
- __u32 bd_portal;
- /** Back pointer to the request */
- struct ptlrpc_request *bd_req;
- cfs_waitq_t bd_waitq; /* server side only WQ */
- int bd_iov_count; /* # entries in bd_iov */
- int bd_max_iov; /* allocated size of bd_iov */
- int bd_nob; /* # bytes covered */
- int bd_nob_transferred; /* # bytes GOT/PUT */
-
- __u64 bd_last_xid;
-
- struct ptlrpc_cb_id bd_cbid; /* network callback info */
- lnet_handle_md_t bd_md_h; /* associated MD */
- lnet_nid_t bd_sender; /* stash event::sender */
+ /** Import generation when request for this bulk was sent */
+ int bd_import_generation;
+ /** LNet portal for this bulk */
+ __u32 bd_portal;
+ /** Server side - export this bulk created for */
+ struct obd_export *bd_export;
+ /** Client side - import this bulk was sent on */
+ struct obd_import *bd_import;
+ /** Back pointer to the request */
+ struct ptlrpc_request *bd_req;
+ cfs_waitq_t bd_waitq; /* server side only WQ */
+ int bd_iov_count; /* # entries in bd_iov */
+ int bd_max_iov; /* allocated size of bd_iov */
+ int bd_nob; /* # bytes covered */
+ int bd_nob_transferred; /* # bytes GOT/PUT */
+
+ __u64 bd_last_xid;
+
+ struct ptlrpc_cb_id bd_cbid; /* network callback info */
+ lnet_nid_t bd_sender; /* stash event::sender */
+ int bd_md_count; /* # valid entries in bd_mds */
+ int bd_md_max_brw; /* max entries in bd_mds */
+ /** array of associated MDs */
+ lnet_handle_md_t bd_mds[PTLRPC_BULK_OPS_COUNT];
#if defined(__KERNEL__)
- /*
- * encrypt iov, size is either 0 or bd_iov_count.
- */
- lnet_kiov_t *bd_enc_iov;
+ /*
+ * encrypt iov, size is either 0 or bd_iov_count.
+ */
+ lnet_kiov_t *bd_enc_iov;
- lnet_kiov_t bd_iov[0];
+ lnet_kiov_t bd_iov[0];
#else
- lnet_md_iovec_t bd_iov[0];
+ lnet_md_iovec_t bd_iov[0];
#endif
};
*/
#ifdef HAVE_SERVER_SUPPORT
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
- int npages, int type, int portal);
+ unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal);
int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
LASSERT(desc != NULL);
spin_lock(&desc->bd_lock);
- rc = desc->bd_network_rw;
+ rc = desc->bd_md_count;
spin_unlock(&desc->bd_lock);
return rc;
}
return 0;
spin_lock(&desc->bd_lock);
- rc = desc->bd_network_rw;
+ rc = desc->bd_md_count;
spin_unlock(&desc->bd_lock);
return rc;
}
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
- int npages, int type, int portal);
+ unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal);
void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin);
static inline void ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc *bulk)
{
/* just a sum of the loi/lop pending numbers to be exported by /proc */
cfs_atomic_t cl_pending_w_pages;
cfs_atomic_t cl_pending_r_pages;
- int cl_max_pages_per_rpc;
+ __u32 cl_max_pages_per_rpc;
int cl_max_rpcs_in_flight;
struct obd_histogram cl_read_rpc_hist;
struct obd_histogram cl_write_rpc_hist;
return false;
}
+static inline int cli_brw_size(struct obd_device *obd)
+{
+ LASSERT(obd != NULL);
+ return obd->u.cli.cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
+}
+
#endif /* __OBD_H */
#endif
cfs_atomic_set(&cli->cl_resends, OSC_DEFAULT_RESENDS);
- /* This value may be changed at connect time in
- ptlrpc_connect_interpret. */
- cli->cl_max_pages_per_rpc = min((int)PTLRPC_MAX_BRW_PAGES,
- (int)(LNET_MTU >> CFS_PAGE_SHIFT));
+ /* This value may be reduced at connect time in
+ * ptlrpc_connect_interpret() . We initialize it to only
+ * 1MB until we know what the performance looks like.
+ * In the future this should likely be increased. LU-1431 */
+ cli->cl_max_pages_per_rpc = min_t(int, PTLRPC_MAX_BRW_PAGES,
+ LNET_MTU >> CFS_PAGE_SHIFT);
if (!strcmp(name, LUSTRE_MDC_NAME)) {
cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT;
/* We don't reply anyway. */
rc = -ETIMEDOUT;
ptlrpc_abort_bulk(desc);
- } else if (!desc->bd_success ||
- desc->bd_nob_transferred != desc->bd_nob) {
- DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)",
- desc->bd_success ?
- "truncated" : "network error on",
- bulk2type(desc),
- desc->bd_nob_transferred,
- desc->bd_nob);
+ } else if (desc->bd_failure ||
+ desc->bd_nob_transferred != desc->bd_nob) {
+ DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)",
+ desc->bd_failure ?
+ "network error on" : "truncated",
+ bulk2type(desc),
+ desc->bd_nob_transferred,
+ desc->bd_nob);
/* XXX Should this be a different errno? */
- rc = -ETIMEDOUT;
+ rc = -ETIMEDOUT;
} else if (desc->bd_type == BULK_GET_SINK) {
rc = sptlrpc_svc_unwrap_bulk(req, desc);
}
}
data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
- OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
+ OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
OBD_CONNECT_CANCELSET | OBD_CONNECT_FID |
OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK|
OBD_CONNECT_AT | OBD_CONNECT_RMT_CLIENT |
ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
ria->ria_pages)
-#define RAS_INCREASE_STEP PTLRPC_MAX_BRW_PAGES
+/* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't
+ * know what the actual RPC size is. If this needs to change, it makes more
+ * sense to tune the i_blkbits value for the file based on the OSTs it is
+ * striped over, rather than having a constant value for all files here. */
+#define RAS_INCREASE_STEP(inode) (1UL << inode->i_blkbits)
static inline int stride_io_mode(struct ll_readahead_state *ras)
{
RETURN(ret);
}
-static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
+static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
+ ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1));
}
/* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
+static void ras_reset(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_last_readpage = index;
- ras->ras_consecutive_requests = 0;
- ras->ras_consecutive_pages = 0;
- ras->ras_window_len = 0;
- ras_set_start(ras, index);
- ras->ras_next_readahead = max(ras->ras_window_start, index);
+ ras->ras_last_readpage = index;
+ ras->ras_consecutive_requests = 0;
+ ras->ras_consecutive_pages = 0;
+ ras->ras_window_len = 0;
+ ras_set_start(inode, ras, index);
+ ras->ras_next_readahead = max(ras->ras_window_start, index);
- RAS_CDEBUG(ras);
+ RAS_CDEBUG(ras);
}
/* called with the ras_lock held or from places where it doesn't matter */
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
{
spin_lock_init(&ras->ras_lock);
- ras_reset(ras, 0);
+ ras_reset(inode, ras, 0);
ras->ras_requests = 0;
CFS_INIT_LIST_HEAD(&ras->ras_read_beads);
}
* Check whether the read request is in the stride window.
* If it is in the stride window, return 1, otherwise return 0.
*/
-static int index_in_stride_window(unsigned long index,
- struct ll_readahead_state *ras,
- struct inode *inode)
+static int index_in_stride_window(struct ll_readahead_state *ras,
+ unsigned long index)
{
- unsigned long stride_gap = index - ras->ras_last_readpage - 1;
+ unsigned long stride_gap;
- if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
- ras->ras_stride_pages == ras->ras_stride_length)
- return 0;
+ if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
+ ras->ras_stride_pages == ras->ras_stride_length)
+ return 0;
- /* If it is contiguous read */
- if (stride_gap == 0)
- return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
+ stride_gap = index - ras->ras_last_readpage - 1;
- /*Otherwise check the stride by itself */
- return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
- ras->ras_consecutive_pages == ras->ras_stride_pages;
+ /* If it is contiguous read */
+ if (stride_gap == 0)
+ return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
+
+ /* Otherwise check the stride by itself */
+ return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
+ ras->ras_consecutive_pages == ras->ras_stride_pages;
}
static void ras_update_stride_detector(struct ll_readahead_state *ras,
RAS_CDEBUG(ras);
}
-static void ras_increase_window(struct ll_readahead_state *ras,
- struct ll_ra_info *ra, struct inode *inode)
+static void ras_increase_window(struct inode *inode,
+ struct ll_readahead_state *ras,
+ struct ll_ra_info *ra)
{
- /* The stretch of ra-window should be aligned with max rpc_size
- * but current clio architecture does not support retrieve such
- * information from lower layer. FIXME later
- */
- if (stride_io_mode(ras))
- ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
- else
- ras->ras_window_len = min(ras->ras_window_len +
- RAS_INCREASE_STEP,
- ra->ra_max_pages_per_file);
+ /* The stretch of ra-window should be aligned with max rpc_size
+ * but current clio architecture does not support retrieve such
+ * information from lower layer. FIXME later
+ */
+ if (stride_io_mode(ras))
+ ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode));
+ else
+ ras->ras_window_len = min(ras->ras_window_len +
+ RAS_INCREASE_STEP(inode),
+ ra->ra_max_pages_per_file);
}
void ras_update(struct ll_sb_info *sbi, struct inode *inode,
GOTO(out_unlock, 0);
}
}
- if (zero) {
- /* check whether it is in stride I/O mode*/
- if (!index_in_stride_window(index, ras, inode)) {
- if (ras->ras_consecutive_stride_requests == 0 &&
- ras->ras_request_index == 0) {
- ras_update_stride_detector(ras, index);
- ras->ras_consecutive_stride_requests ++;
- } else {
- ras_stride_reset(ras);
- }
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- GOTO(out_unlock, 0);
- } else {
- ras->ras_consecutive_pages = 0;
- ras->ras_consecutive_requests = 0;
- if (++ras->ras_consecutive_stride_requests > 1)
- stride_detect = 1;
- RAS_CDEBUG(ras);
- }
- } else {
- if (ra_miss) {
- if (index_in_stride_window(index, ras, inode) &&
- stride_io_mode(ras)) {
- /*If stride-RA hit cache miss, the stride dector
- *will not be reset to avoid the overhead of
- *redetecting read-ahead mode */
- if (index != ras->ras_last_readpage + 1)
- ras->ras_consecutive_pages = 0;
- ras_reset(ras, index);
- RAS_CDEBUG(ras);
- } else {
- /* Reset both stride window and normal RA
- * window */
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- ras_stride_reset(ras);
- GOTO(out_unlock, 0);
- }
- } else if (stride_io_mode(ras)) {
- /* If this is contiguous read but in stride I/O mode
- * currently, check whether stride step still is valid,
- * if invalid, it will reset the stride ra window*/
- if (!index_in_stride_window(index, ras, inode)) {
- /* Shrink stride read-ahead window to be zero */
- ras_stride_reset(ras);
- ras->ras_window_len = 0;
- ras->ras_next_readahead = index;
- }
- }
- }
- ras->ras_consecutive_pages++;
- ras->ras_last_readpage = index;
- ras_set_start(ras, index);
-
- if (stride_io_mode(ras))
- /* Since stride readahead is sentivite to the offset
- * of read-ahead, so we use original offset here,
- * instead of ras_window_start, which is 1M aligned*/
- ras->ras_next_readahead = max(index,
- ras->ras_next_readahead);
- else
- ras->ras_next_readahead = max(ras->ras_window_start,
- ras->ras_next_readahead);
- RAS_CDEBUG(ras);
+ if (zero) {
+ /* check whether it is in stride I/O mode*/
+ if (!index_in_stride_window(ras, index)) {
+ if (ras->ras_consecutive_stride_requests == 0 &&
+ ras->ras_request_index == 0) {
+ ras_update_stride_detector(ras, index);
+ ras->ras_consecutive_stride_requests++;
+ } else {
+ ras_stride_reset(ras);
+ }
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ GOTO(out_unlock, 0);
+ } else {
+ ras->ras_consecutive_pages = 0;
+ ras->ras_consecutive_requests = 0;
+ if (++ras->ras_consecutive_stride_requests > 1)
+ stride_detect = 1;
+ RAS_CDEBUG(ras);
+ }
+ } else {
+ if (ra_miss) {
+ if (index_in_stride_window(ras, index) &&
+ stride_io_mode(ras)) {
+ /*If stride-RA hit cache miss, the stride dector
+ *will not be reset to avoid the overhead of
+ *redetecting read-ahead mode */
+ if (index != ras->ras_last_readpage + 1)
+ ras->ras_consecutive_pages = 0;
+ ras_reset(inode, ras, index);
+ RAS_CDEBUG(ras);
+ } else {
+ /* Reset both stride window and normal RA
+ * window */
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ ras_stride_reset(ras);
+ GOTO(out_unlock, 0);
+ }
+ } else if (stride_io_mode(ras)) {
+ /* If this is contiguous read but in stride I/O mode
+ * currently, check whether stride step still is valid,
+ * if invalid, it will reset the stride ra window*/
+ if (!index_in_stride_window(ras, index)) {
+ /* Shrink stride read-ahead window to be zero */
+ ras_stride_reset(ras);
+ ras->ras_window_len = 0;
+ ras->ras_next_readahead = index;
+ }
+ }
+ }
+ ras->ras_consecutive_pages++;
+ ras->ras_last_readpage = index;
+ ras_set_start(inode, ras, index);
+
+ if (stride_io_mode(ras))
+ /* Since stride readahead is sentivite to the offset
+ * of read-ahead, so we use original offset here,
+ * instead of ras_window_start, which is RPC aligned */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ else
+ ras->ras_next_readahead = max(ras->ras_window_start,
+ ras->ras_next_readahead);
+ RAS_CDEBUG(ras);
- /* Trigger RA in the mmap case where ras_consecutive_requests
- * is not incremented and thus can't be used to trigger RA */
- if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
- ras->ras_window_len = RAS_INCREASE_STEP;
- GOTO(out_unlock, 0);
- }
+ /* Trigger RA in the mmap case where ras_consecutive_requests
+ * is not incremented and thus can't be used to trigger RA */
+ if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
+ ras->ras_window_len = RAS_INCREASE_STEP(inode);
+ GOTO(out_unlock, 0);
+ }
- /* Initially reset the stride window offset to next_readahead*/
- if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
- /**
- * Once stride IO mode is detected, next_readahead should be
- * reset to make sure next_readahead > stride offset
- */
- ras->ras_next_readahead = max(index, ras->ras_next_readahead);
- ras->ras_stride_offset = index;
- ras->ras_window_len = RAS_INCREASE_STEP;
- }
+ /* Initially reset the stride window offset to next_readahead*/
+ if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
+ /**
+ * Once stride IO mode is detected, next_readahead should be
+ * reset to make sure next_readahead > stride offset
+ */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ ras->ras_stride_offset = index;
+ ras->ras_window_len = RAS_INCREASE_STEP(inode);
+ }
- /* The initial ras_window_len is set to the request size. To avoid
- * uselessly reading and discarding pages for random IO the window is
- * only increased once per consecutive request received. */
- if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
- !ras->ras_request_index)
- ras_increase_window(ras, ra, inode);
- EXIT;
+ /* The initial ras_window_len is set to the request size. To avoid
+ * uselessly reading and discarding pages for random IO the window is
+ * only increased once per consecutive request received. */
+ if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
+ !ras->ras_request_index)
+ ras_increase_window(inode, ras, ra);
+ EXIT;
out_unlock:
RAS_CDEBUG(ras);
ras->ras_request_index++;
req->rq_request_portal = MDS_READPAGE_PORTAL;
ptlrpc_at_set_req_timeout(req);
- desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
+ desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
/* NB req now owns desc and will free it when it gets freed. */
ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
req->rq_request_portal = MDS_READPAGE_PORTAL;
ptlrpc_at_set_req_timeout(req);
- desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, BULK_PUT_SINK,
- MDS_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
+ MDS_BULK_PORTAL);
if (desc == NULL) {
ptlrpc_request_free(req);
RETURN(-ENOMEM);
int rc;
ENTRY;
- desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
- MDS_BULK_PORTAL);
- if (desc == NULL)
- RETURN(-ENOMEM);
+ desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
+ MDS_BULK_PORTAL);
+ if (desc == NULL)
+ RETURN(-ENOMEM);
if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
/* old client requires reply size in it's PAGE_SIZE,
if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH)
rdpg->rp_attrs |= LUDA_64BITHASH;
rdpg->rp_count = min_t(unsigned int, reqbody->nlink,
- exp_brw_size(info->mti_exp));
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
CFS_PAGE_SHIFT;
OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
if (req_ii->ii_count <= 0)
GOTO(out, rc = -EFAULT);
rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
- exp_brw_size(info->mti_exp));
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT;
/* allocate pages to store the containers */
body->mcb_bits = CFS_PAGE_SHIFT;
body->mcb_units = nrpages;
- /* allocate bulk transfer descriptor */
- desc = ptlrpc_prep_bulk_imp(req, nrpages, BULK_PUT_SINK,
- MGS_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
+ /* allocate bulk transfer descriptor */
+ desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK,
+ MGS_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
- for (i = 0; i < nrpages; i++)
+ for (i = 0; i < nrpages; i++)
ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, CFS_PAGE_SIZE);
ptlrpc_request_set_replen(req);
unit_size = min_t(int, 1 << body->mcb_bits, CFS_PAGE_SIZE);
bytes = mgs_nidtbl_read(req->rq_export, &fsdb->fsdb_nidtbl, res,
pages, nrpages, bufsize / unit_size, unit_size);
- if (bytes < 0)
- GOTO(out, rc = bytes);
-
- /* start bulk transfer */
- page_count = (bytes + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
- LASSERT(page_count <= nrpages);
- desc = ptlrpc_prep_bulk_exp(req, page_count,
- BULK_PUT_SOURCE, MGS_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
-
- for (i = 0; i < page_count && bytes > 0; i++) {
+ if (bytes < 0)
+ GOTO(out, rc = bytes);
+
+ /* start bulk transfer */
+ page_count = (bytes + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+ LASSERT(page_count <= nrpages);
+ desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
+ BULK_PUT_SOURCE, MGS_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ for (i = 0; i < page_count && bytes > 0; i++) {
ptlrpc_prep_bulk_page_pin(desc, pages[i], 0,
min_t(int, bytes, CFS_PAGE_SIZE));
bytes -= CFS_PAGE_SIZE;
void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj)
{
- ioobj->ioo_id = oa->o_id;
- if (oa->o_valid & OBD_MD_FLGROUP)
- ioobj->ioo_seq = oa->o_seq;
- else
- ioobj->ioo_seq = 0;
- ioobj->ioo_type = oa->o_mode;
+ ioobj->ioo_id = oa->o_id;
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ ioobj->ioo_seq = oa->o_seq;
+ else
+ ioobj->ioo_seq = 0;
+ /* Since 2.4 this does not contain o_mode in the low 16 bits.
+ * Instead, it holds (bd_md_max_brw - 1) for multi-bulk BRW RPCs */
+ ioobj->ioo_max_brw = 0;
}
EXPORT_SYMBOL(obdo_to_ioobj);
}
ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
+ OBD_CONNECT_BRW_SIZE |
OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE;
+ ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
ocd->ocd_version = LUSTRE_VERSION_CODE;
ocd->ocd_group = FID_SEQ_ECHO;
#include "ofd_internal.h"
-#define OFD_GRANT_CHUNK (2ULL * DT_MAX_BRW_SIZE)
-#define OFD_GRANT_CHUNK_EXP(rexp) (2ULL * exp_brw_size((rexp)))
-#define OFD_GRANT_SHRINK_LIMIT(rexp) (16ULL * OFD_GRANT_CHUNK_EXP((rexp)))
+/* At least enough to send a couple of 1MB RPCs, even if not max sized */
+#define OFD_GRANT_CHUNK (2ULL * DT_MAX_BRW_SIZE)
+
+/* Clients typically hold 2x their max_rpcs_in_flight of grant space */
+#define OFD_GRANT_SHRINK_LIMIT(exp) (2ULL * 8 * exp_max_brw_size(exp))
static inline obd_size ofd_grant_from_cli(struct obd_export *exp,
struct ofd_device *ofd, obd_size val)
static inline obd_size ofd_grant_chunk(struct obd_export *exp,
struct ofd_device *ofd)
{
- if (exp && ofd_obd(ofd)->obd_self_export == exp)
+ if (ofd_obd(ofd)->obd_self_export == exp)
/* Grant enough space to handle a big precreate request */
return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace;
- if (exp && ofd_grant_compat(exp, ofd))
+ if (ofd_grant_compat(exp, ofd))
/* Try to grant enough space to send a full-size RPC */
- return exp_brw_size(exp) <<
+ return exp_max_brw_size(exp) <<
(ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
- return OFD_GRANT_CHUNK;
+
+ /* Try to return enough to send two full RPCs, if needed */
+ return exp_max_brw_size(exp) * 2;
}
/**
if (!grant)
RETURN(0);
- /* Allow >OFD_GRANT_CHUNK_EXP size when clients reconnect due to a
- * server reboot. */
+ /* Limit to ofd_grant_chunk() if client is not reconnecting */
if ((grant > grant_chunk) && (!obd->obd_recovering))
grant = grant_chunk;
/* When close to free space exhaustion, trigger a sync to force
* writeback cache to consume required space immediately and release as
* much space as possible. */
- if (!obd->obd_recovering && force != 2 &&
- left < ofd_grant_chunk(NULL, ofd)) {
+ if (!obd->obd_recovering && force != 2 && left < OFD_GRANT_CHUNK) {
bool from_grant = true;
int i;
struct obd_device *dev = data;
struct client_obd *cli = &dev->u.cli;
struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data;
- int chunk_mask, val, rc;
+ int chunk_mask, rc;
+ __u64 val;
- rc = lprocfs_write_helper(buffer, count, &val);
+ rc = lprocfs_write_u64_helper(buffer, count, &val);
if (rc)
return rc;
+ /* if the max_pages is specified in bytes, convert to pages */
+ if (val >= ONE_MB_BRW_SIZE)
+ val >>= CFS_PAGE_SHIFT;
+
LPROCFS_CLIMP_CHECK(dev);
chunk_mask = ~((1 << (cli->cl_chunkbits - CFS_PAGE_SHIFT)) - 1);
CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
- /* This should really be sent by the OST */
- oinfo->oi_oa->o_blksize = exp_brw_size(exp);
+ oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
EXIT;
lustre_get_wire_obdo(oa, &body->oa);
- /* This should really be sent by the OST */
- oa->o_blksize = exp_brw_size(exp);
+ oa->o_blksize = cli_brw_size(exp->exp_obd);
oa->o_valid |= OBD_MD_FLBLKSZ;
/* XXX LOV STACKING: the lsm that is passed to us from LOV does not
return 0;
if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
- int brw_size = exp_brw_size(
- client->cl_import->imp_obd->obd_self_export);
+ /* Get the current RPC size directly, instead of going via:
+ * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
+ * Keep comment here so that it can be found by searching. */
+ int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
client->cl_avail_grant > brw_size)
* retry logic */
req->rq_no_retry_einprogress = 1;
- if (opc == OST_WRITE)
- desc = ptlrpc_prep_bulk_imp(req, page_count,
- BULK_GET_SOURCE, OST_BULK_PORTAL);
- else
- desc = ptlrpc_prep_bulk_imp(req, page_count,
- BULK_PUT_SINK, OST_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp(req, page_count,
+ cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
+ opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
+ OST_BULK_PORTAL);
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
lustre_set_wire_obdo(&body->oa, oa);
- obdo_to_ioobj(oa, ioobj);
- ioobj->ioo_bufcnt = niocount;
- osc_pack_capa(req, body, ocapa);
- LASSERT (page_count > 0);
- pg_prev = pga[0];
+ obdo_to_ioobj(oa, ioobj);
+ ioobj->ioo_bufcnt = niocount;
+ /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
+ * that might be send for this request. The actual number is decided
+ * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
+ * "max - 1" for old client compatibility sending "0", and also so the
+ * the actual maximum is a power-of-two number, not one less. LU-1431 */
+ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+ osc_pack_capa(req, body, ocapa);
+ LASSERT(page_count > 0);
+ pg_prev = pga[0];
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
struct brw_page *pg = pga[i];
int poff = pg->off & ~CFS_PAGE_MASK;
client_obd_list_lock(&cli->cl_loi_list_lock);
data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
- 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
+ 2 * cli_brw_size(obd);
lost_grant = cli->cl_lost_grant;
cli->cl_lost_grant = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
osd_submit_bio(iobuf->dr_rw, bio);
}
- /* allocate new bio, limited by max BIO size, b=9945 */
- bio = bio_alloc(GFP_NOIO, max(BIO_MAX_PAGES,
- (npages - page_idx) *
- blocks_per_page));
+ /* allocate new bio */
+ bio = bio_alloc(GFP_NOIO, min(BIO_MAX_PAGES,
+ (npages - page_idx) *
+ blocks_per_page));
if (bio == NULL) {
CERROR("Can't allocate bio %u*%u = %u pages\n",
(npages - page_idx), blocks_per_page,
* b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
*/
static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
- struct obd_ioobj *ioobj)
+ struct obd_ioobj *ioobj)
{
- if (oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP)) {
- oa->o_seq = FID_SEQ_OST_MDT0;
- if (ioobj)
- ioobj->ioo_seq = FID_SEQ_OST_MDT0;
- /* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */
- } else if (oa == NULL ||
- !(fid_seq_is_norm(oa->o_seq) || fid_seq_is_mdt(oa->o_seq) ||
- fid_seq_is_echo(oa->o_seq))) {
- CERROR("%s: client %s sent invalid object "POSTID"\n",
- exp->exp_obd->obd_name, obd_export_nid2str(exp),
- oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
- return -EPROTO;
- }
- obdo_from_ostid(oa, &oa->o_oi);
- if (ioobj)
+ if (unlikely(oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP))) {
+ oa->o_seq = FID_SEQ_OST_MDT0;
+ if (ioobj)
+ ioobj->ioo_seq = FID_SEQ_OST_MDT0;
+ } else if (unlikely(oa == NULL || !(fid_seq_is_idif(oa->o_seq) ||
+ fid_seq_is_mdt(oa->o_seq) ||
+ fid_seq_is_echo(oa->o_seq)))) {
+ CERROR("%s: client %s sent bad object "POSTID": rc = -EPROTO\n",
+ exp->exp_obd->obd_name, obd_export_nid2str(exp),
+ oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
+ return -EPROTO;
+ }
+
+ obdo_from_ostid(oa, &oa->o_oi);
+ if (ioobj != NULL) {
+ unsigned max_brw = ioobj_max_brw_get(ioobj);
+
+ if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+ CERROR("%s: client %s sent bad ioobj max %u for "POSTID
+ ": rc = -EPROTO\n", exp->exp_obd->obd_name,
+ obd_export_nid2str(exp), max_brw,
+ oa->o_id, oa->o_seq);
+ return -EPROTO;
+ }
ioobj_from_obdo(ioobj, oa);
- return 0;
+ }
+ return 0;
}
void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
if (rc != 0)
GOTO(out_lock, rc);
- desc = ptlrpc_prep_bulk_exp(req, npages,
- BULK_PUT_SOURCE, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out_commitrw, rc = -ENOMEM);
+ desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+ BULK_PUT_SOURCE, OST_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(out_commitrw, rc = -ENOMEM);
nob = 0;
for (i = 0; i < npages; i++) {
if (rc != 0)
GOTO(out_lock, rc);
- desc = ptlrpc_prep_bulk_exp(req, npages,
- BULK_GET_SINK, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(skip_transfer, rc = -ENOMEM);
-
- /* NB Having prepped, we must commit... */
+ desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+ BULK_GET_SINK, OST_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(skip_transfer, rc = -ENOMEM);
- for (i = 0; i < npages; i++)
+ /* NB Having prepped, we must commit... */
+ for (i = 0; i < npages; i++)
ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
local_nb[i].lnb_page_offset,
local_nb[i].len);
EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
/**
- * Allocate and initialize new bulk descriptor
+ * Allocate and initialize new bulk descriptor on the sender.
* Returns pointer to the descriptor or NULL on error.
*/
-struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal)
{
- struct ptlrpc_bulk_desc *desc;
+ struct ptlrpc_bulk_desc *desc;
+ int i;
- OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
- if (!desc)
- return NULL;
+ OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]));
+ if (!desc)
+ return NULL;
spin_lock_init(&desc->bd_lock);
- cfs_waitq_init(&desc->bd_waitq);
- desc->bd_max_iov = npages;
- desc->bd_iov_count = 0;
- LNetInvalidateHandle(&desc->bd_md_h);
- desc->bd_portal = portal;
- desc->bd_type = type;
-
- return desc;
+ cfs_waitq_init(&desc->bd_waitq);
+ desc->bd_max_iov = npages;
+ desc->bd_iov_count = 0;
+ desc->bd_portal = portal;
+ desc->bd_type = type;
+ desc->bd_md_count = 0;
+ LASSERT(max_brw > 0);
+ desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
+ /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
+ * node. Negotiated ocd_brw_size will always be <= this number. */
+ for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
+ LNetInvalidateHandle(&desc->bd_mds[i]);
+
+ return desc;
}
/**
* error.
*/
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
- int npages, int type, int portal)
+ unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal)
{
- struct obd_import *imp = req->rq_import;
- struct ptlrpc_bulk_desc *desc;
+ struct obd_import *imp = req->rq_import;
+ struct ptlrpc_bulk_desc *desc;
- ENTRY;
- LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
- desc = new_bulk(npages, type, portal);
- if (desc == NULL)
- RETURN(NULL);
+ ENTRY;
+ LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
+ desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+ if (desc == NULL)
+ RETURN(NULL);
desc->bd_import_generation = req->rq_import_generation;
desc->bd_import = class_import_get(imp);
*/
void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
{
- int i;
- ENTRY;
+ int i;
+ ENTRY;
- LASSERT(desc != NULL);
- LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
- LASSERT(!desc->bd_network_rw); /* network hands off or */
- LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
+ LASSERT(desc != NULL);
+ LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
+ LASSERT(desc->bd_md_count == 0); /* network hands off */
+ LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
- sptlrpc_enc_pool_put_pages(desc);
+ sptlrpc_enc_pool_put_pages(desc);
- if (desc->bd_export)
- class_export_put(desc->bd_export);
- else
- class_import_put(desc->bd_import);
+ if (desc->bd_export)
+ class_export_put(desc->bd_export);
+ else
+ class_import_put(desc->bd_import);
if (unpin) {
for (i = 0; i < desc->bd_iov_count ; i++)
cfs_page_unpin(desc->bd_iov[i].kiov_page);
}
- OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
- bd_iov[desc->bd_max_iov]));
- EXIT;
+ OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+ bd_iov[desc->bd_max_iov]));
+ EXIT;
}
EXPORT_SYMBOL(__ptlrpc_free_bulk);
if (ptlrpc_client_bulk_active(req))
continue;
- if (!req->rq_bulk->bd_success) {
- /* The RPC reply arrived OK, but the bulk screwed
- * up! Dead weird since the server told us the RPC
- * was good after getting the REPLY for her GET or
- * the ACK for her PUT. */
- DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
- req->rq_status = -EIO;
- }
+ if (req->rq_bulk->bd_failure) {
+ /* The RPC reply arrived OK, but the bulk screwed
+ * up! Dead weird since the server told us the RPC
+ * was good after getting the REPLY for her GET or
+ * the ACK for her PUT. */
+ DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+ req->rq_status = -EIO;
+ }
ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
#define YEAR_2004 (1ULL << 30)
void ptlrpc_init_xid(void)
{
- time_t now = cfs_time_current_sec();
+ time_t now = cfs_time_current_sec();
spin_lock_init(&ptlrpc_last_xid_lock);
- if (now < YEAR_2004) {
- cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
- ptlrpc_last_xid >>= 2;
- ptlrpc_last_xid |= (1ULL << 61);
- } else {
- ptlrpc_last_xid = (__u64)now << 20;
- }
+ if (now < YEAR_2004) {
+ cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
+ ptlrpc_last_xid >>= 2;
+ ptlrpc_last_xid |= (1ULL << 61);
+ } else {
+ ptlrpc_last_xid = (__u64)now << 20;
+ }
+
+ /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
+ CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
+ ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
}
/**
- * Increase xid and returns resultng new value to the caller.
+ * Increase xid and returns resulting new value to the caller.
+ *
+ * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
+ * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
+ * itself uses the last bulk xid needed, so the server can determine the
+ * the number of bulk transfers from the RPC XID and a bitmask. The starting
+ * xid must align to a power-of-two value.
+ *
+ * This is assumed to be true due to the initial ptlrpc_last_xid
+ * value also being initialized to a power-of-two value. LU-1431
*/
__u64 ptlrpc_next_xid(void)
{
- __u64 tmp;
+ __u64 next;
+
spin_lock(&ptlrpc_last_xid_lock);
- tmp = ++ptlrpc_last_xid;
+ next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
+ ptlrpc_last_xid = next;
spin_unlock(&ptlrpc_last_xid_lock);
- return tmp;
+
+ return next;
}
EXPORT_SYMBOL(ptlrpc_next_xid);
{
#if BITS_PER_LONG == 32
/* need to avoid possible word tearing on 32-bit systems */
- __u64 tmp;
+ __u64 next;
+
spin_lock(&ptlrpc_last_xid_lock);
- tmp = ptlrpc_last_xid + 1;
+ next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
spin_unlock(&ptlrpc_last_xid_lock);
- return tmp;
+
+ return next;
#else
/* No need to lock, since returned value is racy anyways */
- return ptlrpc_last_xid + 1;
+ return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
#endif
}
EXPORT_SYMBOL(ptlrpc_sample_next_xid);
ev->type, ev->status, desc);
spin_lock(&desc->bd_lock);
- req = desc->bd_req;
- LASSERT(desc->bd_network_rw);
- desc->bd_network_rw = 0;
-
- if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
- desc->bd_success = 1;
- desc->bd_nob_transferred = ev->mlength;
- desc->bd_sender = ev->sender;
- } else {
- /* start reconnect and resend if network error hit */
+ req = desc->bd_req;
+ LASSERT(desc->bd_md_count > 0);
+ desc->bd_md_count--;
+
+ if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
+ desc->bd_nob_transferred += ev->mlength;
+ desc->bd_sender = ev->sender;
+ } else {
+ /* start reconnect and resend if network error hit */
spin_lock(&req->rq_lock);
req->rq_net_err = 1;
spin_unlock(&req->rq_lock);
- }
+ }
- /* release the encrypted pages for write */
- if (desc->bd_req->rq_bulk_write)
- sptlrpc_enc_pool_put_pages(desc);
+ if (ev->status != 0)
+ desc->bd_failure = 1;
- /* NB don't unlock till after wakeup; desc can disappear under us
- * otherwise */
- ptlrpc_client_wake_req(req);
+ /* NB don't unlock till after wakeup; desc can disappear under us
+ * otherwise */
+ if (desc->bd_md_count == 0)
+ ptlrpc_client_wake_req(desc->bd_req);
spin_unlock(&desc->bd_lock);
EXIT;
*/
void server_bulk_callback (lnet_event_t *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
- ENTRY;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
+ ENTRY;
- LASSERT (ev->type == LNET_EVENT_SEND ||
- ev->type == LNET_EVENT_UNLINK ||
- (desc->bd_type == BULK_PUT_SOURCE &&
- ev->type == LNET_EVENT_ACK) ||
- (desc->bd_type == BULK_GET_SINK &&
- ev->type == LNET_EVENT_REPLY));
+ LASSERT(ev->type == LNET_EVENT_SEND ||
+ ev->type == LNET_EVENT_UNLINK ||
+ (desc->bd_type == BULK_PUT_SOURCE &&
+ ev->type == LNET_EVENT_ACK) ||
+ (desc->bd_type == BULK_GET_SINK &&
+ ev->type == LNET_EVENT_REPLY));
CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
"event type %d, status %d, desc %p\n",
spin_lock(&desc->bd_lock);
- if ((ev->type == LNET_EVENT_ACK ||
- ev->type == LNET_EVENT_REPLY) &&
- ev->status == 0) {
- /* We heard back from the peer, so even if we get this
- * before the SENT event (oh yes we can), we know we
- * read/wrote the peer buffer and how much... */
- desc->bd_success = 1;
- desc->bd_nob_transferred = ev->mlength;
- desc->bd_sender = ev->sender;
- }
+ LASSERT(desc->bd_md_count > 0);
- if (ev->unlinked) {
- /* This is the last callback no matter what... */
- desc->bd_network_rw = 0;
- cfs_waitq_signal(&desc->bd_waitq);
- }
+ if ((ev->type == LNET_EVENT_ACK ||
+ ev->type == LNET_EVENT_REPLY) &&
+ ev->status == 0) {
+ /* We heard back from the peer, so even if we get this
+ * before the SENT event (oh yes we can), we know we
+ * read/wrote the peer buffer and how much... */
+ desc->bd_nob_transferred += ev->mlength;
+ desc->bd_sender = ev->sender;
+ }
+
+ if (ev->status != 0)
+ desc->bd_failure = 1;
+
+ if (ev->unlinked) {
+ desc->bd_md_count--;
+ /* This is the last callback no matter what... */
+ if (desc->bd_md_count == 0)
+ cfs_waitq_signal(&desc->bd_waitq);
+ }
spin_unlock(&desc->bd_lock);
EXIT;
* Enforce ADLER for backward compatibility*/
cli->cl_supp_cksum_types = OBD_CKSUM_ADLER;
}
- cli->cl_cksum_type =cksum_type_select(cli->cl_supp_cksum_types);
-
- if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
- cli->cl_max_pages_per_rpc =
- ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
- else if (imp->imp_connect_op == MDS_CONNECT ||
- imp->imp_connect_op == MGS_CONNECT)
- cli->cl_max_pages_per_rpc = 1;
+ cli->cl_cksum_type =cksum_type_select(cli->cl_supp_cksum_types);
+
+ if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
+ cli->cl_max_pages_per_rpc =
+ min(ocd->ocd_brw_size >> CFS_PAGE_SHIFT,
+ cli->cl_max_pages_per_rpc);
+ else if (imp->imp_connect_op == MDS_CONNECT ||
+ imp->imp_connect_op == MGS_CONNECT)
+ cli->cl_max_pages_per_rpc = 1;
/* Reset ns_connect_flags only for initial connect. It might be
* changed in while using FS and if we reset it in reconnect
RETURN (0);
}
+static void mdunlink_iterate_helper(lnet_handle_md_t *bd_mds, int count)
+{
+ int i;
+
+ for (i = 0; i < count; i++)
+ LNetMDUnlink(bd_mds[i]);
+}
+
#ifdef HAVE_SERVER_SUPPORT
/**
* Prepare bulk descriptor for specified incoming request \a req that
* error.
*/
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
- int npages, int type, int portal)
+ unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal)
{
- struct obd_export *exp = req->rq_export;
- struct ptlrpc_bulk_desc *desc;
+ struct obd_export *exp = req->rq_export;
+ struct ptlrpc_bulk_desc *desc;
- ENTRY;
- LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
+ ENTRY;
+ LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
- desc = new_bulk(npages, type, portal);
- if (desc == NULL)
- RETURN(NULL);
+ desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+ if (desc == NULL)
+ RETURN(NULL);
desc->bd_export = class_export_get(exp);
desc->bd_req = req;
EXPORT_SYMBOL(ptlrpc_prep_bulk_exp);
/**
- * Starts bulk transfer for descriptor \a desc
+ * Starts bulk transfer for descriptor \a desc on the server.
* Returns 0 on success or error code.
*/
int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
{
- struct ptlrpc_connection *conn = desc->bd_export->exp_connection;
- int rc;
- int rc2;
- lnet_md_t md;
- __u64 xid;
- ENTRY;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_PUT_NET))
- RETURN(0);
-
- /* NB no locking required until desc is on the network */
- LASSERT (!desc->bd_network_rw);
- LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
- desc->bd_type == BULK_GET_SINK);
- desc->bd_success = 0;
-
- md.user_ptr = &desc->bd_cbid;
- md.eq_handle = ptlrpc_eq_h;
- md.threshold = 2; /* SENT and ACK/REPLY */
- md.options = PTLRPC_MD_OPTIONS;
- ptlrpc_fill_bulk_md(&md, desc);
-
- LASSERT (desc->bd_cbid.cbid_fn == server_bulk_callback);
- LASSERT (desc->bd_cbid.cbid_arg == desc);
-
- /* NB total length may be 0 for a read past EOF, so we send a 0
- * length bulk, since the client expects a bulk event. */
-
- rc = LNetMDBind(md, LNET_UNLINK, &desc->bd_md_h);
- if (rc != 0) {
- CERROR("LNetMDBind failed: %d\n", rc);
- LASSERT (rc == -ENOMEM);
- RETURN(-ENOMEM);
- }
-
- /* Client's bulk and reply matchbits are the same */
- xid = desc->bd_req->rq_xid;
- CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
- "id %s xid "LPX64"\n", desc->bd_iov_count,
- desc->bd_nob, desc->bd_portal,
- libcfs_id2str(conn->c_peer), xid);
-
- /* Network is about to get at the memory */
- desc->bd_network_rw = 1;
-
- if (desc->bd_type == BULK_PUT_SOURCE)
- rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ,
- conn->c_peer, desc->bd_portal, xid, 0, 0);
- else
- rc = LNetGet (conn->c_self, desc->bd_md_h,
- conn->c_peer, desc->bd_portal, xid, 0);
-
- if (rc != 0) {
- /* Can't send, so we unlink the MD bound above. The UNLINK
- * event this creates will signal completion with failure,
- * so we return SUCCESS here! */
- CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
- libcfs_id2str(conn->c_peer), desc->bd_portal, xid, rc);
- rc2 = LNetMDUnlink(desc->bd_md_h);
- LASSERT (rc2 == 0);
- }
-
- RETURN(0);
+ struct obd_export *exp = desc->bd_export;
+ struct ptlrpc_connection *conn = exp->exp_connection;
+ int rc = 0;
+ __u64 xid;
+ int posted_md;
+ int total_md;
+ lnet_md_t md;
+ ENTRY;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_PUT_NET))
+ RETURN(0);
+
+ /* NB no locking required until desc is on the network */
+ LASSERT(desc->bd_md_count == 0);
+ LASSERT(desc->bd_type == BULK_PUT_SOURCE ||
+ desc->bd_type == BULK_GET_SINK);
+
+ LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback);
+ LASSERT(desc->bd_cbid.cbid_arg == desc);
+
+ /* NB total length may be 0 for a read past EOF, so we send 0
+ * length bulks, since the client expects bulk events.
+ *
+ * The client may not need all of the bulk XIDs for the RPC. The RPC
+ * used the XID of the highest bulk XID needed, and the server masks
+ * off high bits to get bulk count for this RPC. LU-1431 */
+ xid = desc->bd_req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
+ total_md = desc->bd_req->rq_xid - xid + 1;
+
+ desc->bd_md_count = total_md;
+ desc->bd_failure = 0;
+
+ md.user_ptr = &desc->bd_cbid;
+ md.eq_handle = ptlrpc_eq_h;
+ md.threshold = 2; /* SENT and ACK/REPLY */
+
+ for (posted_md = 0; posted_md < total_md; xid++) {
+ md.options = PTLRPC_MD_OPTIONS;
+
+ /* NB it's assumed that source and sink buffer frags are
+ * page-aligned. Otherwise we'd have to send client bulk
+ * sizes over and split server buffer accordingly */
+ ptlrpc_fill_bulk_md(&md, desc, posted_md);
+ rc = LNetMDBind(md, LNET_UNLINK, &desc->bd_mds[posted_md]);
+ if (rc != 0) {
+ CERROR("%s: LNetMDBind failed for MD %u: rc = %d\n",
+ exp->exp_obd->obd_name, posted_md, rc);
+ LASSERT(rc == -ENOMEM);
+ if (posted_md == 0) {
+ desc->bd_md_count = 0;
+ RETURN(-ENOMEM);
+ }
+ break;
+ }
+ /* Network is about to get at the memory */
+ if (desc->bd_type == BULK_PUT_SOURCE)
+ rc = LNetPut(conn->c_self, desc->bd_mds[posted_md],
+ LNET_ACK_REQ, conn->c_peer,
+ desc->bd_portal, xid, 0, 0);
+ else
+ rc = LNetGet(conn->c_self, desc->bd_mds[posted_md],
+ conn->c_peer, desc->bd_portal, xid, 0);
+
+ posted_md++;
+ if (rc != 0) {
+ CERROR("%s: failed bulk transfer with %s:%u x"LPU64": "
+ "rc = %d\n", exp->exp_obd->obd_name,
+ libcfs_id2str(conn->c_peer), desc->bd_portal,
+ xid, rc);
+ break;
+ }
+ }
+
+ if (rc != 0) {
+ /* Can't send, so we unlink the MD bound above. The UNLINK
+ * event this creates will signal completion with failure,
+ * so we return SUCCESS here! */
+ spin_lock(&desc->bd_lock);
+ desc->bd_md_count -= total_md - posted_md;
+ spin_unlock(&desc->bd_lock);
+ LASSERT(desc->bd_md_count >= 0);
+
+ mdunlink_iterate_helper(desc->bd_mds, posted_md);
+ RETURN(0);
+ }
+
+ CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
+ "id %s xid "LPX64"-"LPX64"\n", desc->bd_iov_count,
+ desc->bd_nob, desc->bd_portal, libcfs_id2str(conn->c_peer),
+ xid - posted_md, xid - 1);
+
+ RETURN(0);
}
EXPORT_SYMBOL(ptlrpc_start_bulk_transfer);
* one. If it fails, it must be because completion just happened,
* but we must still l_wait_event() in this case, to give liblustre
* a chance to run server_bulk_callback()*/
-
- LNetMDUnlink(desc->bd_md_h);
+ mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_count);
for (;;) {
/* Network access will complete in finite time but the HUGE
#endif /* HAVE_SERVER_SUPPORT */
/**
- * Register bulk for later transfer
+ * Register bulk at the sender for later transfer.
* Returns 0 on success or error code.
*/
int ptlrpc_register_bulk(struct ptlrpc_request *req)
{
- struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- lnet_process_id_t peer;
- int rc;
- int rc2;
- lnet_handle_me_t me_h;
- lnet_md_t md;
- ENTRY;
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ lnet_process_id_t peer;
+ int rc = 0;
+ int rc2;
+ int posted_md;
+ int total_md;
+ __u64 xid;
+ lnet_handle_me_t me_h;
+ lnet_md_t md;
+ ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_GET_NET))
RETURN(0);
- /* NB no locking required until desc is on the network */
- LASSERT (desc->bd_nob > 0);
- LASSERT (!desc->bd_network_rw);
- LASSERT (desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
- LASSERT (desc->bd_req != NULL);
- LASSERT (desc->bd_type == BULK_PUT_SINK ||
- desc->bd_type == BULK_GET_SOURCE);
+ /* NB no locking required until desc is on the network */
+ LASSERT(desc->bd_nob > 0);
+ LASSERT(desc->bd_md_count == 0);
+ LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
+ LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
+ LASSERT(desc->bd_req != NULL);
+ LASSERT(desc->bd_type == BULK_PUT_SINK ||
+ desc->bd_type == BULK_GET_SOURCE);
- desc->bd_success = 0;
+ desc->bd_failure = 0;
- peer = desc->bd_import->imp_connection->c_peer;
+ peer = desc->bd_import->imp_connection->c_peer;
- md.user_ptr = &desc->bd_cbid;
- md.eq_handle = ptlrpc_eq_h;
- md.threshold = 1; /* PUT or GET */
- md.options = PTLRPC_MD_OPTIONS |
- ((desc->bd_type == BULK_GET_SOURCE) ?
- LNET_MD_OP_GET : LNET_MD_OP_PUT);
- ptlrpc_fill_bulk_md(&md, desc);
-
- LASSERT (desc->bd_cbid.cbid_fn == client_bulk_callback);
- LASSERT (desc->bd_cbid.cbid_arg == desc);
-
- /* XXX Registering the same xid on retried bulk makes my head
- * explode trying to understand how the original request's bulk
- * might interfere with the retried request -eeb
- * On the other hand replaying with the same xid is fine, since
- * we are guaranteed old request have completed. -green */
- LASSERTF(!(desc->bd_registered &&
- req->rq_send_state != LUSTRE_IMP_REPLAY) ||
- req->rq_xid != desc->bd_last_xid,
- "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
- desc->bd_registered, req->rq_xid, desc->bd_last_xid);
- desc->bd_registered = 1;
- desc->bd_last_xid = req->rq_xid;
-
- rc = LNetMEAttach(desc->bd_portal, peer,
- req->rq_xid, 0, LNET_UNLINK, LNET_INS_AFTER, &me_h);
- if (rc != 0) {
- CERROR("LNetMEAttach failed: %d\n", rc);
- LASSERT (rc == -ENOMEM);
- RETURN (-ENOMEM);
- }
+ LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
+ LASSERT(desc->bd_cbid.cbid_arg == desc);
- /* About to let the network at it... */
- desc->bd_network_rw = 1;
- rc = LNetMDAttach(me_h, md, LNET_UNLINK, &desc->bd_md_h);
- if (rc != 0) {
- CERROR("LNetMDAttach failed: %d\n", rc);
- LASSERT (rc == -ENOMEM);
- desc->bd_network_rw = 0;
- rc2 = LNetMEUnlink (me_h);
- LASSERT (rc2 == 0);
- RETURN (-ENOMEM);
- }
+ /* An XID is only used for a single request from the client.
+ * For retried bulk transfers, a new XID will be allocated in
+ * in ptlrpc_check_set() if it needs to be resent, so it is not
+ * using the same RDMA match bits after an error.
+ *
+ * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The
+ * first bulk XID is power-of-two aligned before rq_xid. LU-1431 */
+ xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
+ LASSERTF(!(desc->bd_registered &&
+ req->rq_send_state != LUSTRE_IMP_REPLAY) ||
+ xid != desc->bd_last_xid,
+ "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
+ desc->bd_registered, xid, desc->bd_last_xid);
+
+ total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
+ desc->bd_registered = 1;
+ desc->bd_last_xid = xid;
+ desc->bd_md_count = total_md;
+ md.user_ptr = &desc->bd_cbid;
+ md.eq_handle = ptlrpc_eq_h;
+ md.threshold = 1; /* PUT or GET */
+
+ for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
+ md.options = PTLRPC_MD_OPTIONS |
+ ((desc->bd_type == BULK_GET_SOURCE) ?
+ LNET_MD_OP_GET : LNET_MD_OP_PUT);
+ ptlrpc_fill_bulk_md(&md, desc, posted_md);
+
+ rc = LNetMEAttach(desc->bd_portal, peer, xid, 0,
+ LNET_UNLINK, LNET_INS_AFTER, &me_h);
+ if (rc != 0) {
+ CERROR("%s: LNetMEAttach failed x"LPU64"/%d: rc = %d\n",
+ desc->bd_export->exp_obd->obd_name, xid,
+ posted_md, rc);
+ break;
+ }
- CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPU64", "
- "portal %u\n",
- desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
- desc->bd_iov_count, desc->bd_nob,
- req->rq_xid, desc->bd_portal);
- RETURN(0);
+ /* About to let the network at it... */
+ rc = LNetMDAttach(me_h, md, LNET_UNLINK,
+ &desc->bd_mds[posted_md]);
+ if (rc != 0) {
+ CERROR("%s: LNetMDAttach failed x"LPU64"/%d: rc = %d\n",
+ desc->bd_export->exp_obd->obd_name, xid,
+ posted_md, rc);
+ rc2 = LNetMEUnlink(me_h);
+ LASSERT(rc2 == 0);
+ break;
+ }
+ }
+
+ if (rc != 0) {
+ LASSERT(rc == -ENOMEM);
+ spin_lock(&desc->bd_lock);
+ desc->bd_md_count -= total_md - posted_md;
+ spin_unlock(&desc->bd_lock);
+ LASSERT(desc->bd_md_count >= 0);
+ mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
+ req->rq_status = -ENOMEM;
+ RETURN(-ENOMEM);
+ }
+
+ /* Set rq_xid to matchbits of the final bulk so that server can
+ * infer the number of bulks that were prepared */
+ req->rq_xid = --xid;
+ LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK),
+ "bd_last_xid = x"LPU64", rq_xid = x"LPU64"\n",
+ desc->bd_last_xid, req->rq_xid);
+
+ spin_lock(&desc->bd_lock);
+ /* Holler if peer manages to touch buffers before he knows the xid */
+ if (desc->bd_md_count != total_md)
+ CWARN("%s: Peer %s touched %d buffers while I registered\n",
+ desc->bd_export->exp_obd->obd_name, libcfs_id2str(peer),
+ total_md - desc->bd_md_count);
+ spin_unlock(&desc->bd_lock);
+
+ CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
+ "xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
+ desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
+ desc->bd_iov_count, desc->bd_nob,
+ desc->bd_last_xid, req->rq_xid, desc->bd_portal);
+
+ RETURN(0);
}
EXPORT_SYMBOL(ptlrpc_register_bulk);
async && req->rq_bulk_deadline == 0)
req->rq_bulk_deadline = cfs_time_current_sec() + LONG_UNLINK;
- if (!ptlrpc_client_bulk_active(req)) /* completed or */
- RETURN(1); /* never registered */
-
- LASSERT(desc->bd_req == req); /* bd_req NULL until registered */
+ if (ptlrpc_client_bulk_active(req) == 0) /* completed or */
+ RETURN(1); /* never registered */
- /* the unlink ensures the callback happens ASAP and is the last
- * one. If it fails, it must be because completion just happened,
- * but we must still l_wait_event() in this case to give liblustre
- * a chance to run client_bulk_callback() */
+ LASSERT(desc->bd_req == req); /* bd_req NULL until registered */
- LNetMDUnlink(desc->bd_md_h);
+ /* the unlink ensures the callback happens ASAP and is the last
+ * one. If it fails, it must be because completion just happened,
+ * but we must still l_wait_event() in this case to give liblustre
+ * a chance to run client_bulk_callback() */
+ mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
- if (!ptlrpc_client_bulk_active(req)) /* completed or */
- RETURN(1); /* never registered */
+ if (ptlrpc_client_bulk_active(req) == 0) /* completed or */
+ RETURN(1); /* never registered */
/* Move to "Unregistering" phase as bulk was not unlinked yet. */
ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING);
}
EXPORT_SYMBOL(lustre_swab_obd_statfs);
-void lustre_swab_obd_ioobj (struct obd_ioobj *ioo)
+void lustre_swab_obd_ioobj(struct obd_ioobj *ioo)
{
- __swab64s (&ioo->ioo_id);
- __swab64s (&ioo->ioo_seq);
- __swab32s (&ioo->ioo_type);
- __swab32s (&ioo->ioo_bufcnt);
+ __swab64s(&ioo->ioo_id);
+ __swab64s(&ioo->ioo_seq);
+ __swab32s(&ioo->ioo_max_brw);
+ __swab32s(&ioo->ioo_bufcnt);
}
EXPORT_SYMBOL(lustre_swab_obd_ioobj);
/* Dump functions */
void dump_ioo(struct obd_ioobj *ioo)
{
- CDEBUG(D_RPCTRACE,
- "obd_ioobj: ioo_id="LPD64", ioo_seq="LPD64", ioo_type=%d, "
- "ioo_bufct=%d\n", ioo->ioo_id, ioo->ioo_seq, ioo->ioo_type,
- ioo->ioo_bufcnt);
+ CDEBUG(D_RPCTRACE,
+ "obd_ioobj: ioo_id="LPD64", ioo_seq="LPD64", ioo_max_brw=%#x, "
+ "ioo_bufct=%d\n", ioo->ioo_id, ioo->ioo_seq, ioo->ioo_max_brw,
+ ioo->ioo_bufcnt);
}
EXPORT_SYMBOL(dump_ioo);
#ifdef __KERNEL__
-void ptlrpc_fill_bulk_md (lnet_md_t *md, struct ptlrpc_bulk_desc *desc)
+void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
+ int mdidx)
{
- LASSERT (desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
- LASSERT (!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS)));
-
- md->options |= LNET_MD_KIOV;
- md->length = desc->bd_iov_count;
- if (desc->bd_enc_iov)
- md->start = desc->bd_enc_iov;
- else
- md->start = desc->bd_iov;
+ CLASSERT(PTLRPC_MAX_BRW_PAGES < LI_POISON);
+
+ LASSERT(mdidx < desc->bd_md_max_brw);
+ LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
+ LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV |
+ LNET_MD_PHYS)));
+
+ md->options |= LNET_MD_KIOV;
+ md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV);
+ md->length = min_t(unsigned int, LNET_MAX_IOV, md->length);
+ if (desc->bd_enc_iov)
+ md->start = &desc->bd_enc_iov[mdidx * LNET_MAX_IOV];
+ else
+ md->start = &desc->bd_iov[mdidx * LNET_MAX_IOV];
}
void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page,
#else /* !__KERNEL__ */
-void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc)
+void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
+ int mdidx)
{
- LASSERT (!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS)));
- if (desc->bd_iov_count == 1) {
- md->start = desc->bd_iov[0].iov_base;
- md->length = desc->bd_iov[0].iov_len;
- return;
- }
-
- md->options |= LNET_MD_IOVEC;
- md->start = &desc->bd_iov[0];
- md->length = desc->bd_iov_count;
+ LASSERT(mdidx < desc->bd_md_max_brw);
+ LASSERT(desc->bd_iov_count > mdidx * LNET_MAX_IOV);
+ LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS)));
+
+ if (desc->bd_iov_count == 1) {
+ md->start = desc->bd_iov[0].iov_base;
+ md->length = desc->bd_iov[0].iov_len;
+ return;
+ }
+
+ md->options |= LNET_MD_IOVEC;
+ md->start = &desc->bd_iov[mdidx * LNET_MAX_IOV];
+ md->length = min(LNET_MAX_IOV, desc->bd_iov_count - mdidx *
+ LNET_MAX_IOV);
}
static int can_merge_iovs(lnet_md_iovec_t *existing, lnet_md_iovec_t *candidate)
int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc);
/* client.c */
-struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal);
+struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
+ unsigned type, unsigned portal);
void ptlrpc_init_xid(void);
/* events.c */
int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink);
/* pers.c */
-void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc);
+void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
+ int mdcnt);
void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page,
- int pageoffset, int len);
+ int pageoffset, int len);
/* pack_generic.c */
struct ptlrpc_reply_state *
(long long)(int)offsetof(struct obd_ioobj, ioo_oid.oi_seq));
LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq) == 8, "found %lld\n",
(long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq));
- LASSERTF((int)offsetof(struct obd_ioobj, ioo_type) == 16, "found %lld\n",
- (long long)(int)offsetof(struct obd_ioobj, ioo_type));
- LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_type) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_type));
+ LASSERTF((int)offsetof(struct obd_ioobj, ioo_max_brw) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct obd_ioobj, ioo_max_brw));
+ LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw));
LASSERTF((int)offsetof(struct obd_ioobj, ioo_bufcnt) == 20, "found %lld\n",
(long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt));
LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n",
ptlrpc_at_set_req_timeout(req);
/* allocate bulk descriptor */
- desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
+ desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
MDS_BULK_PORTAL);
if (desc == NULL) {
ptlrpc_request_free(req);
}
run_test 230b "nested remote directory should be failed"
+test_231a()
+{
+ # For simplicity this test assumes that max_pages_per_rpc
+ # is the same across all OSCs
+ local max_pages=$($LCTL get_param -n osc.*.max_pages_per_rpc | head -1)
+ local bulk_size=$((max_pages * 4096))
+
+ mkdir -p $DIR/$tdir
+
+ # clear the OSC stats
+ $LCTL set_param osc.*.stats=0 &>/dev/null
+
+ # Client writes $bulk_size - there must be 1 rpc for $max_pages.
+ dd if=/dev/zero of=$DIR/$tdir/$tfile bs=$bulk_size count=1 \
+ oflag=direct &>/dev/null || error "dd failed"
+
+ local nrpcs=$($LCTL get_param osc.*.stats |awk '/ost_write/ {print $2}')
+ if [ x$nrpcs != "x1" ]; then
+ error "found $nrpc ost_write RPCs, not 1 as expected"
+ fi
+
+ # Drop the OSC cache, otherwise we will read from it
+ cancel_lru_locks osc
+
+ # clear the OSC stats
+ $LCTL set_param osc.*.stats=0 &>/dev/null
+
+ # Client reads $bulk_size.
+ dd if=$DIR/$tdir/$tfile of=/dev/null bs=$bulk_size count=1 \
+ iflag=direct &>/dev/null || error "dd failed"
+
+ nrpcs=$($LCTL get_param osc.*.stats | awk '/ost_read/ { print $2 }')
+ if [ x$nrpcs != "x1" ]; then
+ error "found $nrpc ost_read RPCs, not 1 as expected"
+ fi
+}
+run_test 231a "checking that reading/writing of BRW RPC size results in one RPC"
+
+test_231b() {
+ mkdir -p $DIR/$tdir
+ local i
+ for i in {0..1023}; do
+ dd if=/dev/zero of=$DIR/$tdir/$tfile conv=notrunc \
+ seek=$((2 * i)) bs=4096 count=1 &>/dev/null ||
+ error "dd of=$DIR/$tdir/$tfile seek=$((2 * i)) failed"
+ done
+ sync
+}
+run_test 231b "must not assert on fully utilized OST request buffer"
+
#
# tests that do cleanup/setup should be run at the end
#
CHECK_STRUCT(obd_ioobj);
CHECK_MEMBER(obd_ioobj, ioo_id);
CHECK_MEMBER(obd_ioobj, ioo_seq);
- CHECK_MEMBER(obd_ioobj, ioo_type);
+ CHECK_MEMBER(obd_ioobj, ioo_max_brw);
CHECK_MEMBER(obd_ioobj, ioo_bufcnt);
}
(long long)(int)offsetof(struct obd_ioobj, ioo_oid.oi_seq));
LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq) == 8, "found %lld\n",
(long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq));
- LASSERTF((int)offsetof(struct obd_ioobj, ioo_type) == 16, "found %lld\n",
- (long long)(int)offsetof(struct obd_ioobj, ioo_type));
- LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_type) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_type));
+ LASSERTF((int)offsetof(struct obd_ioobj, ioo_max_brw) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct obd_ioobj, ioo_max_brw));
+ LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw));
LASSERTF((int)offsetof(struct obd_ioobj, ioo_bufcnt) == 20, "found %lld\n",
(long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt));
LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n",