From 0f8b7f951e7f43dbf389a431afea2091388a805e Mon Sep 17 00:00:00 2001 From: Sergii Glushchenko Date: Tue, 5 Feb 2013 10:17:34 +0200 Subject: [PATCH] LU-1431 ptlrpc: Support for over 1MB bulk I/O RPC Increase the maximum size of BRW RPCs between OSCs and OSTs to 4MB by increasing the number of LNet Memory Descriptors assosiated with a bulk. Each bulk transfer is still limited to LNET_MTU_SIZE, so that it can be passed through routers and does not break RDMA limits. The client and server negotiate the maximum BRW RPC size at connect time via ocd_brw_size, with the client sending the maximum size it supports, and the server returning min(client_max, server_max) back to the client. For each RPC, the number of bulk MDs that are registered depends on the actual RPC size at sending time. The (max_brw_count - 1) is sent to the OST in the high 16 bits of obd_ioobj.ioo_max_brw and forms a mask for the bulk RPC XIDs. The actual number of bulk transfers is encoded in the BRW RPC XID. The masked value of the request XID determines the starting bulk transfer match bits, and the last bulk match is the RPC XID. For older clients this means that the starting and ending bulk match bits are the RPC XID itself (as it always was) and the old ioo_type (now ioo_max_brw) field being set to zero would result in a mask of "1", which just keeps the same RPC XID as it was before. It is important to note that the client and server do NOT share the value of PTLRPC_BULK_OPS_MASK, or this would incorrectly mask the RPC XID if the PTLRPC_BULK_OPS_BITS was ever increased on the server. The actual BRW RPC size can be controlled for each client separately by changing the client's max_pages_per_rpc. For example, this will set it to 4MB: lctl set_param osc.*.max_pages_per_rpc=1024 It is also possible to specify a units suffix, so that the RPC size can be set independently of the client PAGE_SIZE: lctl set_param osc.*.max_pages_per_rpc=4M By default, the size is 1MB for for OST BRW RPCs until more testing is done. All other bulk I/O (e.g. MDC<->MDS) is left at 1MB. With PTLRPC_MAX_BRW_SIZE growing larger, it doesn't make sense for the readahead to increase the window size so dramatically at one time Instead, limit readahead growth by the current inode i_blkbits value, which currently defaults to 4MB. If we want to tune the readahead growth so that it matches the actual cl_max_pages_per_rpc value, then this can be done much more easily by changing the per-inode i_blkbits value, since this is tied to specific OSTs with specific RPC settings (which may be different for e.g. local and remote OSTs), and only needs to be checked once. Change the ras_*() functions to take the inode as an argument, and always put it first in the argument list for consistency. Signed-off-by: Andreas Dilger Signed-off-by: Sergii Glushchenko Reviewed-by: Alexey Lyashkov Reviewed-by: Andrew Perepechko Change-Id: I757b14a04e5d4cc053576e41e47864c743c35b8b Xyratex-bug-id: MRP-319 Reviewed-on: http://review.whamcloud.com/4993 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Oleg Drokin Reviewed-by: Jinshan Xiong --- lnet/include/lnet/types.h | 8 +- lustre/contrib/wireshark/packet-lustre.c | 14 +- lustre/include/lustre/lustre_idl.h | 36 +-- lustre/include/lustre_export.h | 7 +- lustre/include/lustre_net.h | 141 +++++++----- lustre/include/obd.h | 8 +- lustre/ldlm/ldlm_lib.c | 28 +-- lustre/llite/llite_lib.c | 2 +- lustre/llite/rw.c | 257 +++++++++++---------- lustre/mdc/mdc_request.c | 10 +- lustre/mdt/mdt_handler.c | 12 +- lustre/mgc/mgc_request.c | 12 +- lustre/mgs/mgs_nids.c | 24 +- lustre/obdclass/obdo.c | 14 +- lustre/obdecho/echo_client.c | 2 + lustre/ofd/ofd_grant.c | 24 +- lustre/osc/lproc_osc.c | 9 +- lustre/osc/osc_request.c | 40 ++-- lustre/osd-ldiskfs/osd_io.c | 8 +- lustre/ost/ost_handler.c | 65 +++--- lustre/ptlrpc/client.c | 147 +++++++----- lustre/ptlrpc/events.c | 86 +++---- lustre/ptlrpc/import.c | 17 +- lustre/ptlrpc/niobuf.c | 376 +++++++++++++++++++------------ lustre/ptlrpc/pack_generic.c | 18 +- lustre/ptlrpc/pers.c | 53 +++-- lustre/ptlrpc/ptlrpc_internal.h | 8 +- lustre/ptlrpc/wiretest.c | 8 +- lustre/quota/qsd_request.c | 2 +- lustre/tests/sanity.sh | 50 ++++ lustre/utils/wirecheck.c | 2 +- lustre/utils/wiretest.c | 8 +- 32 files changed, 875 insertions(+), 621 deletions(-) diff --git a/lnet/include/lnet/types.h b/lnet/include/lnet/types.h index 6fcb631..2e7d269 100644 --- a/lnet/include/lnet/types.h +++ b/lnet/include/lnet/types.h @@ -269,9 +269,11 @@ typedef struct { lnet_handle_eq_t eq_handle; } lnet_md_t; -/* Max Transfer Unit (minimum supported everywhere) */ -#define LNET_MTU_BITS 20 -#define LNET_MTU (1<ioo_max_brw >> IOOBJ_MAX_BRW_BITS) + 1) +#define ioobj_max_brw_set(ioo, num) \ +do { (ioo)->ioo_max_brw = ((num) - 1) << IOOBJ_MAX_BRW_BITS; } while (0) #define ioo_id ioo_oid.oi_id #define ioo_seq ioo_oid.oi_seq diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index b2250d3..29f6167 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -281,7 +281,7 @@ static inline __u64 exp_connect_flags(struct obd_export *exp) return *exp_connect_flags_ptr(exp); } -static inline int exp_brw_size(struct obd_export *exp) +static inline int exp_max_brw_size(struct obd_export *exp) { LASSERT(exp != NULL); if (exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE) @@ -290,6 +290,11 @@ static inline int exp_brw_size(struct obd_export *exp) return ONE_MB_BRW_SIZE; } +static inline int exp_connect_multibulk(struct obd_export *exp) +{ + return exp_max_brw_size(exp) > ONE_MB_BRW_SIZE; +} + static inline int exp_expired(struct obd_export *exp, cfs_duration_t age) { LASSERT(exp->exp_delayed); diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 74983d0..dbeea10 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -83,17 +83,38 @@ #define PTLRPC_MD_OPTIONS 0 /** - * Define maxima for bulk I/O - * CAVEAT EMPTOR, with multinet (i.e. routers forwarding between networks) - * these limits are system wide and not interface-local. */ -#define PTLRPC_MAX_BRW_BITS LNET_MTU_BITS -#define PTLRPC_MAX_BRW_SIZE (1 << LNET_MTU_BITS) -#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> CFS_PAGE_SHIFT) + * Max # of bulk operations in one request. + * In order for the client and server to properly negotiate the maximum + * possible transfer size, PTLRPC_BULK_OPS_COUNT must be a power-of-two + * value. The client is free to limit the actual RPC size for any bulk + * transfer via cl_max_pages_per_rpc to some non-power-of-two value. */ +#define PTLRPC_BULK_OPS_BITS 2 +#define PTLRPC_BULK_OPS_COUNT (1U << PTLRPC_BULK_OPS_BITS) +/** + * PTLRPC_BULK_OPS_MASK is for the convenience of the client only, and + * should not be used on the server at all. Otherwise, it imposes a + * protocol limitation on the maximum RPC size that can be used by any + * RPC sent to that server in the future. Instead, the server should + * use the negotiated per-client ocd_brw_size to determine the bulk + * RPC count. */ +#define PTLRPC_BULK_OPS_MASK (~((__u64)PTLRPC_BULK_OPS_COUNT - 1)) + +/** + * Define maxima for bulk I/O. + * + * A single PTLRPC BRW request is sent via up to PTLRPC_BULK_OPS_COUNT + * of LNET_MTU sized RDMA transfers. Clients and servers negotiate the + * currently supported maximum between peers at connect via ocd_brw_size. + */ +#define PTLRPC_MAX_BRW_BITS (LNET_MTU_BITS + PTLRPC_BULK_OPS_BITS) +#define PTLRPC_MAX_BRW_SIZE (1 << PTLRPC_MAX_BRW_BITS) +#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> CFS_PAGE_SHIFT) #define ONE_MB_BRW_SIZE (1 << LNET_MTU_BITS) #define MD_MAX_BRW_SIZE (1 << LNET_MTU_BITS) #define MD_MAX_BRW_PAGES (MD_MAX_BRW_SIZE >> CFS_PAGE_SHIFT) -#define DT_MAX_BRW_SIZE (1 << LNET_MTU_BITS) +#define DT_MAX_BRW_SIZE PTLRPC_MAX_BRW_SIZE +#define DT_MAX_BRW_PAGES (DT_MAX_BRW_SIZE >> CFS_PAGE_SHIFT) #define OFD_MAX_BRW_SIZE (1 << LNET_MTU_BITS) /* When PAGE_SIZE is a constant, we can check our arithmetic here with cpp! */ @@ -104,10 +125,10 @@ # if (PTLRPC_MAX_BRW_SIZE != (PTLRPC_MAX_BRW_PAGES * CFS_PAGE_SIZE)) # error "PTLRPC_MAX_BRW_SIZE isn't PTLRPC_MAX_BRW_PAGES * CFS_PAGE_SIZE" # endif -# if (PTLRPC_MAX_BRW_SIZE > LNET_MTU) +# if (PTLRPC_MAX_BRW_SIZE > LNET_MTU * PTLRPC_BULK_OPS_COUNT) # error "PTLRPC_MAX_BRW_SIZE too big" # endif -# if (PTLRPC_MAX_BRW_PAGES > LNET_MAX_IOV) +# if (PTLRPC_MAX_BRW_PAGES > LNET_MAX_IOV * PTLRPC_BULK_OPS_COUNT) # error "PTLRPC_MAX_BRW_PAGES too big" # endif #endif /* __KERNEL__ */ @@ -377,19 +398,24 @@ #define OSS_CR_NTHRS_BASE 8 #define OSS_CR_NTHRS_MAX 64 -#define OST_NBUFS (64 * cfs_num_online_cpus()) -#define OST_BUFSIZE (8 * 1024) - /** - * OST_MAXREQSIZE ~= 4768 bytes = - * lustre_msg + obdo + 16 * obd_ioobj + 256 * niobuf_remote + * OST_MAXREQSIZE ~= + * lustre_msg + obdo + obd_ioobj + DT_MAX_BRW_PAGES * niobuf_remote * * - single object with 16 pages is 512 bytes * - OST_MAXREQSIZE must be at least 1 page of cookies plus some spillover + * - Must be a multiple of 1024 */ -#define OST_MAXREQSIZE (5 * 1024) +#define _OST_MAXREQSIZE_SUM (sizeof(struct lustre_msg) + sizeof(struct obdo) + \ + sizeof(struct obd_ioobj) + DT_MAX_BRW_PAGES * \ + sizeof(struct niobuf_remote)) +#define OST_MAXREQSIZE (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1) + #define OST_MAXREPSIZE (9 * 1024) +#define OST_NBUFS (64 * cfs_num_online_cpus()) +#define OST_BUFSIZE (OST_MAXREQSIZE + 1024) + /* Macro to hide a typecast. */ #define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args) @@ -1788,7 +1814,7 @@ struct ptlrpc_bulk_page { #define BULK_PUT_SOURCE 3 /** - * Definition of buk descriptor. + * Definition of bulk descriptor. * Bulks are special "Two phase" RPCs where initial request message * is sent first and it is followed bt a transfer (o receiving) of a large * amount of data to be settled into pages referenced from the bulk descriptors. @@ -1798,47 +1824,48 @@ struct ptlrpc_bulk_page { * Another user is readpage for MDT. */ struct ptlrpc_bulk_desc { - /** completed successfully */ - unsigned long bd_success:1; - /** accessible to the network (network io potentially in progress) */ - unsigned long bd_network_rw:1; - /** {put,get}{source,sink} */ - unsigned long bd_type:2; - /** client side */ - unsigned long bd_registered:1; - /** For serialization with callback */ + /** completed with failure */ + unsigned long bd_failure:1; + /** {put,get}{source,sink} */ + unsigned long bd_type:2; + /** client side */ + unsigned long bd_registered:1; + /** For serialization with callback */ spinlock_t bd_lock; - /** Import generation when request for this bulk was sent */ - int bd_import_generation; - /** Server side - export this bulk created for */ - struct obd_export *bd_export; - /** Client side - import this bulk was sent on */ - struct obd_import *bd_import; - /** LNet portal for this bulk */ - __u32 bd_portal; - /** Back pointer to the request */ - struct ptlrpc_request *bd_req; - cfs_waitq_t bd_waitq; /* server side only WQ */ - int bd_iov_count; /* # entries in bd_iov */ - int bd_max_iov; /* allocated size of bd_iov */ - int bd_nob; /* # bytes covered */ - int bd_nob_transferred; /* # bytes GOT/PUT */ - - __u64 bd_last_xid; - - struct ptlrpc_cb_id bd_cbid; /* network callback info */ - lnet_handle_md_t bd_md_h; /* associated MD */ - lnet_nid_t bd_sender; /* stash event::sender */ + /** Import generation when request for this bulk was sent */ + int bd_import_generation; + /** LNet portal for this bulk */ + __u32 bd_portal; + /** Server side - export this bulk created for */ + struct obd_export *bd_export; + /** Client side - import this bulk was sent on */ + struct obd_import *bd_import; + /** Back pointer to the request */ + struct ptlrpc_request *bd_req; + cfs_waitq_t bd_waitq; /* server side only WQ */ + int bd_iov_count; /* # entries in bd_iov */ + int bd_max_iov; /* allocated size of bd_iov */ + int bd_nob; /* # bytes covered */ + int bd_nob_transferred; /* # bytes GOT/PUT */ + + __u64 bd_last_xid; + + struct ptlrpc_cb_id bd_cbid; /* network callback info */ + lnet_nid_t bd_sender; /* stash event::sender */ + int bd_md_count; /* # valid entries in bd_mds */ + int bd_md_max_brw; /* max entries in bd_mds */ + /** array of associated MDs */ + lnet_handle_md_t bd_mds[PTLRPC_BULK_OPS_COUNT]; #if defined(__KERNEL__) - /* - * encrypt iov, size is either 0 or bd_iov_count. - */ - lnet_kiov_t *bd_enc_iov; + /* + * encrypt iov, size is either 0 or bd_iov_count. + */ + lnet_kiov_t *bd_enc_iov; - lnet_kiov_t bd_iov[0]; + lnet_kiov_t bd_iov[0]; #else - lnet_md_iovec_t bd_iov[0]; + lnet_md_iovec_t bd_iov[0]; #endif }; @@ -2390,7 +2417,8 @@ extern lnet_pid_t ptl_get_pid(void); */ #ifdef HAVE_SERVER_SUPPORT struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req, - int npages, int type, int portal); + unsigned npages, unsigned max_brw, + unsigned type, unsigned portal); int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc); void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc); @@ -2401,7 +2429,7 @@ static inline int ptlrpc_server_bulk_active(struct ptlrpc_bulk_desc *desc) LASSERT(desc != NULL); spin_lock(&desc->bd_lock); - rc = desc->bd_network_rw; + rc = desc->bd_md_count; spin_unlock(&desc->bd_lock); return rc; } @@ -2426,7 +2454,7 @@ static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req) return 0; spin_lock(&desc->bd_lock); - rc = desc->bd_network_rw; + rc = desc->bd_md_count; spin_unlock(&desc->bd_lock); return rc; } @@ -2511,7 +2539,8 @@ void ptlrpc_req_finished(struct ptlrpc_request *request); void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request); struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req); struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, - int npages, int type, int portal); + unsigned npages, unsigned max_brw, + unsigned type, unsigned portal); void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin); static inline void ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc *bulk) { diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 3be4105..404fc4b 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -494,7 +494,7 @@ struct client_obd { /* just a sum of the loi/lop pending numbers to be exported by /proc */ cfs_atomic_t cl_pending_w_pages; cfs_atomic_t cl_pending_r_pages; - int cl_max_pages_per_rpc; + __u32 cl_max_pages_per_rpc; int cl_max_rpcs_in_flight; struct obd_histogram cl_read_rpc_hist; struct obd_histogram cl_write_rpc_hist; @@ -1733,4 +1733,10 @@ bad_format: return false; } +static inline int cli_brw_size(struct obd_device *obd) +{ + LASSERT(obd != NULL); + return obd->u.cli.cl_max_pages_per_rpc << CFS_PAGE_SHIFT; +} + #endif /* __OBD_H */ diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 31cb5bb..2e3a1d5 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -385,10 +385,12 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) #endif cfs_atomic_set(&cli->cl_resends, OSC_DEFAULT_RESENDS); - /* This value may be changed at connect time in - ptlrpc_connect_interpret. */ - cli->cl_max_pages_per_rpc = min((int)PTLRPC_MAX_BRW_PAGES, - (int)(LNET_MTU >> CFS_PAGE_SHIFT)); + /* This value may be reduced at connect time in + * ptlrpc_connect_interpret() . We initialize it to only + * 1MB until we know what the performance looks like. + * In the future this should likely be increased. LU-1431 */ + cli->cl_max_pages_per_rpc = min_t(int, PTLRPC_MAX_BRW_PAGES, + LNET_MTU >> CFS_PAGE_SHIFT); if (!strcmp(name, LUSTRE_MDC_NAME)) { cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT; @@ -2713,16 +2715,16 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc, /* We don't reply anyway. */ rc = -ETIMEDOUT; ptlrpc_abort_bulk(desc); - } else if (!desc->bd_success || - desc->bd_nob_transferred != desc->bd_nob) { - DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)", - desc->bd_success ? - "truncated" : "network error on", - bulk2type(desc), - desc->bd_nob_transferred, - desc->bd_nob); + } else if (desc->bd_failure || + desc->bd_nob_transferred != desc->bd_nob) { + DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)", + desc->bd_failure ? + "network error on" : "truncated", + bulk2type(desc), + desc->bd_nob_transferred, + desc->bd_nob); /* XXX Should this be a different errno? */ - rc = -ETIMEDOUT; + rc = -ETIMEDOUT; } else if (desc->bd_type == BULK_GET_SINK) { rc = sptlrpc_svc_unwrap_bulk(req, desc); } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 6e91365..ad360f1 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -393,7 +393,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, } data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION | - OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | + OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CANCELSET | OBD_CONNECT_FID | OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK| OBD_CONNECT_AT | OBD_CONNECT_RMT_CLIENT | diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 2227a17..cbd1c91 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -572,7 +572,11 @@ static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io, ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\ ria->ria_pages) -#define RAS_INCREASE_STEP PTLRPC_MAX_BRW_PAGES +/* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't + * know what the actual RPC size is. If this needs to change, it makes more + * sense to tune the i_blkbits value for the file based on the OSTs it is + * striped over, rather than having a constant value for all files here. */ +#define RAS_INCREASE_STEP(inode) (1UL << inode->i_blkbits) static inline int stride_io_mode(struct ll_readahead_state *ras) { @@ -843,22 +847,24 @@ int ll_readahead(const struct lu_env *env, struct cl_io *io, RETURN(ret); } -static void ras_set_start(struct ll_readahead_state *ras, unsigned long index) +static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras, + unsigned long index) { - ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1)); + ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1)); } /* called with the ras_lock held or from places where it doesn't matter */ -static void ras_reset(struct ll_readahead_state *ras, unsigned long index) +static void ras_reset(struct inode *inode, struct ll_readahead_state *ras, + unsigned long index) { - ras->ras_last_readpage = index; - ras->ras_consecutive_requests = 0; - ras->ras_consecutive_pages = 0; - ras->ras_window_len = 0; - ras_set_start(ras, index); - ras->ras_next_readahead = max(ras->ras_window_start, index); + ras->ras_last_readpage = index; + ras->ras_consecutive_requests = 0; + ras->ras_consecutive_pages = 0; + ras->ras_window_len = 0; + ras_set_start(inode, ras, index); + ras->ras_next_readahead = max(ras->ras_window_start, index); - RAS_CDEBUG(ras); + RAS_CDEBUG(ras); } /* called with the ras_lock held or from places where it doesn't matter */ @@ -873,7 +879,7 @@ static void ras_stride_reset(struct ll_readahead_state *ras) void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras) { spin_lock_init(&ras->ras_lock); - ras_reset(ras, 0); + ras_reset(inode, ras, 0); ras->ras_requests = 0; CFS_INIT_LIST_HEAD(&ras->ras_read_beads); } @@ -882,23 +888,24 @@ void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras) * Check whether the read request is in the stride window. * If it is in the stride window, return 1, otherwise return 0. */ -static int index_in_stride_window(unsigned long index, - struct ll_readahead_state *ras, - struct inode *inode) +static int index_in_stride_window(struct ll_readahead_state *ras, + unsigned long index) { - unsigned long stride_gap = index - ras->ras_last_readpage - 1; + unsigned long stride_gap; - if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 || - ras->ras_stride_pages == ras->ras_stride_length) - return 0; + if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 || + ras->ras_stride_pages == ras->ras_stride_length) + return 0; - /* If it is contiguous read */ - if (stride_gap == 0) - return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages; + stride_gap = index - ras->ras_last_readpage - 1; - /*Otherwise check the stride by itself */ - return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap && - ras->ras_consecutive_pages == ras->ras_stride_pages; + /* If it is contiguous read */ + if (stride_gap == 0) + return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages; + + /* Otherwise check the stride by itself */ + return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap && + ras->ras_consecutive_pages == ras->ras_stride_pages; } static void ras_update_stride_detector(struct ll_readahead_state *ras, @@ -974,19 +981,20 @@ static void ras_stride_increase_window(struct ll_readahead_state *ras, RAS_CDEBUG(ras); } -static void ras_increase_window(struct ll_readahead_state *ras, - struct ll_ra_info *ra, struct inode *inode) +static void ras_increase_window(struct inode *inode, + struct ll_readahead_state *ras, + struct ll_ra_info *ra) { - /* The stretch of ra-window should be aligned with max rpc_size - * but current clio architecture does not support retrieve such - * information from lower layer. FIXME later - */ - if (stride_io_mode(ras)) - ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP); - else - ras->ras_window_len = min(ras->ras_window_len + - RAS_INCREASE_STEP, - ra->ra_max_pages_per_file); + /* The stretch of ra-window should be aligned with max rpc_size + * but current clio architecture does not support retrieve such + * information from lower layer. FIXME later + */ + if (stride_io_mode(ras)) + ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode)); + else + ras->ras_window_len = min(ras->ras_window_len + + RAS_INCREASE_STEP(inode), + ra->ra_max_pages_per_file); } void ras_update(struct ll_sb_info *sbi, struct inode *inode, @@ -1042,97 +1050,96 @@ void ras_update(struct ll_sb_info *sbi, struct inode *inode, GOTO(out_unlock, 0); } } - if (zero) { - /* check whether it is in stride I/O mode*/ - if (!index_in_stride_window(index, ras, inode)) { - if (ras->ras_consecutive_stride_requests == 0 && - ras->ras_request_index == 0) { - ras_update_stride_detector(ras, index); - ras->ras_consecutive_stride_requests ++; - } else { - ras_stride_reset(ras); - } - ras_reset(ras, index); - ras->ras_consecutive_pages++; - GOTO(out_unlock, 0); - } else { - ras->ras_consecutive_pages = 0; - ras->ras_consecutive_requests = 0; - if (++ras->ras_consecutive_stride_requests > 1) - stride_detect = 1; - RAS_CDEBUG(ras); - } - } else { - if (ra_miss) { - if (index_in_stride_window(index, ras, inode) && - stride_io_mode(ras)) { - /*If stride-RA hit cache miss, the stride dector - *will not be reset to avoid the overhead of - *redetecting read-ahead mode */ - if (index != ras->ras_last_readpage + 1) - ras->ras_consecutive_pages = 0; - ras_reset(ras, index); - RAS_CDEBUG(ras); - } else { - /* Reset both stride window and normal RA - * window */ - ras_reset(ras, index); - ras->ras_consecutive_pages++; - ras_stride_reset(ras); - GOTO(out_unlock, 0); - } - } else if (stride_io_mode(ras)) { - /* If this is contiguous read but in stride I/O mode - * currently, check whether stride step still is valid, - * if invalid, it will reset the stride ra window*/ - if (!index_in_stride_window(index, ras, inode)) { - /* Shrink stride read-ahead window to be zero */ - ras_stride_reset(ras); - ras->ras_window_len = 0; - ras->ras_next_readahead = index; - } - } - } - ras->ras_consecutive_pages++; - ras->ras_last_readpage = index; - ras_set_start(ras, index); - - if (stride_io_mode(ras)) - /* Since stride readahead is sentivite to the offset - * of read-ahead, so we use original offset here, - * instead of ras_window_start, which is 1M aligned*/ - ras->ras_next_readahead = max(index, - ras->ras_next_readahead); - else - ras->ras_next_readahead = max(ras->ras_window_start, - ras->ras_next_readahead); - RAS_CDEBUG(ras); + if (zero) { + /* check whether it is in stride I/O mode*/ + if (!index_in_stride_window(ras, index)) { + if (ras->ras_consecutive_stride_requests == 0 && + ras->ras_request_index == 0) { + ras_update_stride_detector(ras, index); + ras->ras_consecutive_stride_requests++; + } else { + ras_stride_reset(ras); + } + ras_reset(inode, ras, index); + ras->ras_consecutive_pages++; + GOTO(out_unlock, 0); + } else { + ras->ras_consecutive_pages = 0; + ras->ras_consecutive_requests = 0; + if (++ras->ras_consecutive_stride_requests > 1) + stride_detect = 1; + RAS_CDEBUG(ras); + } + } else { + if (ra_miss) { + if (index_in_stride_window(ras, index) && + stride_io_mode(ras)) { + /*If stride-RA hit cache miss, the stride dector + *will not be reset to avoid the overhead of + *redetecting read-ahead mode */ + if (index != ras->ras_last_readpage + 1) + ras->ras_consecutive_pages = 0; + ras_reset(inode, ras, index); + RAS_CDEBUG(ras); + } else { + /* Reset both stride window and normal RA + * window */ + ras_reset(inode, ras, index); + ras->ras_consecutive_pages++; + ras_stride_reset(ras); + GOTO(out_unlock, 0); + } + } else if (stride_io_mode(ras)) { + /* If this is contiguous read but in stride I/O mode + * currently, check whether stride step still is valid, + * if invalid, it will reset the stride ra window*/ + if (!index_in_stride_window(ras, index)) { + /* Shrink stride read-ahead window to be zero */ + ras_stride_reset(ras); + ras->ras_window_len = 0; + ras->ras_next_readahead = index; + } + } + } + ras->ras_consecutive_pages++; + ras->ras_last_readpage = index; + ras_set_start(inode, ras, index); + + if (stride_io_mode(ras)) + /* Since stride readahead is sentivite to the offset + * of read-ahead, so we use original offset here, + * instead of ras_window_start, which is RPC aligned */ + ras->ras_next_readahead = max(index, ras->ras_next_readahead); + else + ras->ras_next_readahead = max(ras->ras_window_start, + ras->ras_next_readahead); + RAS_CDEBUG(ras); - /* Trigger RA in the mmap case where ras_consecutive_requests - * is not incremented and thus can't be used to trigger RA */ - if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) { - ras->ras_window_len = RAS_INCREASE_STEP; - GOTO(out_unlock, 0); - } + /* Trigger RA in the mmap case where ras_consecutive_requests + * is not incremented and thus can't be used to trigger RA */ + if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) { + ras->ras_window_len = RAS_INCREASE_STEP(inode); + GOTO(out_unlock, 0); + } - /* Initially reset the stride window offset to next_readahead*/ - if (ras->ras_consecutive_stride_requests == 2 && stride_detect) { - /** - * Once stride IO mode is detected, next_readahead should be - * reset to make sure next_readahead > stride offset - */ - ras->ras_next_readahead = max(index, ras->ras_next_readahead); - ras->ras_stride_offset = index; - ras->ras_window_len = RAS_INCREASE_STEP; - } + /* Initially reset the stride window offset to next_readahead*/ + if (ras->ras_consecutive_stride_requests == 2 && stride_detect) { + /** + * Once stride IO mode is detected, next_readahead should be + * reset to make sure next_readahead > stride offset + */ + ras->ras_next_readahead = max(index, ras->ras_next_readahead); + ras->ras_stride_offset = index; + ras->ras_window_len = RAS_INCREASE_STEP(inode); + } - /* The initial ras_window_len is set to the request size. To avoid - * uselessly reading and discarding pages for random IO the window is - * only increased once per consecutive request received. */ - if ((ras->ras_consecutive_requests > 1 || stride_detect) && - !ras->ras_request_index) - ras_increase_window(ras, ra, inode); - EXIT; + /* The initial ras_window_len is set to the request size. To avoid + * uselessly reading and discarding pages for random IO the window is + * only increased once per consecutive request received. */ + if ((ras->ras_consecutive_requests > 1 || stride_detect) && + !ras->ras_request_index) + ras_increase_window(inode, ras, ra); + EXIT; out_unlock: RAS_CDEBUG(ras); ras->ras_request_index++; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index df9376b..399c268 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -992,9 +992,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); + desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); /* NB req now owns desc and will free it when it gets freed. */ ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset); @@ -1044,8 +1044,8 @@ restart_bulk: req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, BULK_PUT_SINK, - MDS_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK, + MDS_BULK_PORTAL); if (desc == NULL) { ptlrpc_request_free(req); RETURN(-ENOMEM); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 3d1febd..a88a872 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1605,10 +1605,10 @@ static int mdt_sendpage(struct mdt_thread_info *info, int rc; ENTRY; - desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE, - MDS_BULK_PORTAL); - if (desc == NULL) - RETURN(-ENOMEM); + desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE, + MDS_BULK_PORTAL); + if (desc == NULL) + RETURN(-ENOMEM); if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE)) /* old client requires reply size in it's PAGE_SIZE, @@ -1661,7 +1661,7 @@ int mdt_readpage(struct mdt_thread_info *info) if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH) rdpg->rp_attrs |= LUDA_64BITHASH; rdpg->rp_count = min_t(unsigned int, reqbody->nlink, - exp_brw_size(info->mti_exp)); + exp_max_brw_size(info->mti_exp)); rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); @@ -2095,7 +2095,7 @@ int mdt_obd_idx_read(struct mdt_thread_info *info) if (req_ii->ii_count <= 0) GOTO(out, rc = -EFAULT); rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT, - exp_brw_size(info->mti_exp)); + exp_max_brw_size(info->mti_exp)); rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT; /* allocate pages to store the containers */ diff --git a/lustre/mgc/mgc_request.c b/lustre/mgc/mgc_request.c index 411909f..2993a79 100644 --- a/lustre/mgc/mgc_request.c +++ b/lustre/mgc/mgc_request.c @@ -1490,13 +1490,13 @@ again: body->mcb_bits = CFS_PAGE_SHIFT; body->mcb_units = nrpages; - /* allocate bulk transfer descriptor */ - desc = ptlrpc_prep_bulk_imp(req, nrpages, BULK_PUT_SINK, - MGS_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); + /* allocate bulk transfer descriptor */ + desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK, + MGS_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); - for (i = 0; i < nrpages; i++) + for (i = 0; i < nrpages; i++) ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, CFS_PAGE_SIZE); ptlrpc_request_set_replen(req); diff --git a/lustre/mgs/mgs_nids.c b/lustre/mgs/mgs_nids.c index 52251fc3..781250f 100644 --- a/lustre/mgs/mgs_nids.c +++ b/lustre/mgs/mgs_nids.c @@ -669,18 +669,18 @@ int mgs_get_ir_logs(struct ptlrpc_request *req) unit_size = min_t(int, 1 << body->mcb_bits, CFS_PAGE_SIZE); bytes = mgs_nidtbl_read(req->rq_export, &fsdb->fsdb_nidtbl, res, pages, nrpages, bufsize / unit_size, unit_size); - if (bytes < 0) - GOTO(out, rc = bytes); - - /* start bulk transfer */ - page_count = (bytes + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; - LASSERT(page_count <= nrpages); - desc = ptlrpc_prep_bulk_exp(req, page_count, - BULK_PUT_SOURCE, MGS_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); - - for (i = 0; i < page_count && bytes > 0; i++) { + if (bytes < 0) + GOTO(out, rc = bytes); + + /* start bulk transfer */ + page_count = (bytes + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; + LASSERT(page_count <= nrpages); + desc = ptlrpc_prep_bulk_exp(req, page_count, 1, + BULK_PUT_SOURCE, MGS_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); + + for (i = 0; i < page_count && bytes > 0; i++) { ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, min_t(int, bytes, CFS_PAGE_SIZE)); bytes -= CFS_PAGE_SIZE; diff --git a/lustre/obdclass/obdo.c b/lustre/obdclass/obdo.c index dded526..a0175aa 100644 --- a/lustre/obdclass/obdo.c +++ b/lustre/obdclass/obdo.c @@ -207,12 +207,14 @@ EXPORT_SYMBOL(obdo_cmp_md); void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj) { - ioobj->ioo_id = oa->o_id; - if (oa->o_valid & OBD_MD_FLGROUP) - ioobj->ioo_seq = oa->o_seq; - else - ioobj->ioo_seq = 0; - ioobj->ioo_type = oa->o_mode; + ioobj->ioo_id = oa->o_id; + if (oa->o_valid & OBD_MD_FLGROUP) + ioobj->ioo_seq = oa->o_seq; + else + ioobj->ioo_seq = 0; + /* Since 2.4 this does not contain o_mode in the low 16 bits. + * Instead, it holds (bd_md_max_brw - 1) for multi-bulk BRW RPCs */ + ioobj->ioo_max_brw = 0; } EXPORT_SYMBOL(obdo_to_ioobj); diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index fda4d54..6df84fc 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -3039,8 +3039,10 @@ static int echo_client_setup(const struct lu_env *env, } ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | + OBD_CONNECT_BRW_SIZE | OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE; + ocd->ocd_brw_size = DT_MAX_BRW_SIZE; ocd->ocd_version = LUSTRE_VERSION_CODE; ocd->ocd_group = FID_SEQ_ECHO; diff --git a/lustre/ofd/ofd_grant.c b/lustre/ofd/ofd_grant.c index 9194f74..6a675cc 100644 --- a/lustre/ofd/ofd_grant.c +++ b/lustre/ofd/ofd_grant.c @@ -42,9 +42,11 @@ #include "ofd_internal.h" -#define OFD_GRANT_CHUNK (2ULL * DT_MAX_BRW_SIZE) -#define OFD_GRANT_CHUNK_EXP(rexp) (2ULL * exp_brw_size((rexp))) -#define OFD_GRANT_SHRINK_LIMIT(rexp) (16ULL * OFD_GRANT_CHUNK_EXP((rexp))) +/* At least enough to send a couple of 1MB RPCs, even if not max sized */ +#define OFD_GRANT_CHUNK (2ULL * DT_MAX_BRW_SIZE) + +/* Clients typically hold 2x their max_rpcs_in_flight of grant space */ +#define OFD_GRANT_SHRINK_LIMIT(exp) (2ULL * 8 * exp_max_brw_size(exp)) static inline obd_size ofd_grant_from_cli(struct obd_export *exp, struct ofd_device *ofd, obd_size val) @@ -68,15 +70,17 @@ static inline obd_size ofd_grant_to_cli(struct obd_export *exp, static inline obd_size ofd_grant_chunk(struct obd_export *exp, struct ofd_device *ofd) { - if (exp && ofd_obd(ofd)->obd_self_export == exp) + if (ofd_obd(ofd)->obd_self_export == exp) /* Grant enough space to handle a big precreate request */ return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace; - if (exp && ofd_grant_compat(exp, ofd)) + if (ofd_grant_compat(exp, ofd)) /* Try to grant enough space to send a full-size RPC */ - return exp_brw_size(exp) << + return exp_max_brw_size(exp) << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT); - return OFD_GRANT_CHUNK; + + /* Try to return enough to send two full RPCs, if needed */ + return exp_max_brw_size(exp) * 2; } /** @@ -631,8 +635,7 @@ static long ofd_grant(struct obd_export *exp, obd_size curgrant, if (!grant) RETURN(0); - /* Allow >OFD_GRANT_CHUNK_EXP size when clients reconnect due to a - * server reboot. */ + /* Limit to ofd_grant_chunk() if client is not reconnecting */ if ((grant > grant_chunk) && (!obd->obd_recovering)) grant = grant_chunk; @@ -859,8 +862,7 @@ refresh: /* When close to free space exhaustion, trigger a sync to force * writeback cache to consume required space immediately and release as * much space as possible. */ - if (!obd->obd_recovering && force != 2 && - left < ofd_grant_chunk(NULL, ofd)) { + if (!obd->obd_recovering && force != 2 && left < OFD_GRANT_CHUNK) { bool from_grant = true; int i; diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index 0f3f6f8..54a1602 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -468,12 +468,17 @@ static int lprocfs_osc_wr_max_pages_per_rpc(struct file *file, struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data; - int chunk_mask, val, rc; + int chunk_mask, rc; + __u64 val; - rc = lprocfs_write_helper(buffer, count, &val); + rc = lprocfs_write_u64_helper(buffer, count, &val); if (rc) return rc; + /* if the max_pages is specified in bytes, convert to pages */ + if (val >= ONE_MB_BRW_SIZE) + val >>= CFS_PAGE_SHIFT; + LPROCFS_CLIMP_CHECK(dev); chunk_mask = ~((1 << (cli->cl_chunkbits - CFS_PAGE_SHIFT)) - 1); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 936df67..442fef2 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -295,8 +295,7 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp, CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); lustre_get_wire_obdo(oinfo->oi_oa, &body->oa); - /* This should really be sent by the OST */ - oinfo->oi_oa->o_blksize = exp_brw_size(exp); + oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; EXIT; @@ -478,8 +477,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, lustre_get_wire_obdo(oa, &body->oa); - /* This should really be sent by the OST */ - oa->o_blksize = exp_brw_size(exp); + oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; /* XXX LOV STACKING: the lsm that is passed to us from LOV does not @@ -998,8 +996,10 @@ static int osc_should_shrink_grant(struct client_obd *client) return 0; if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { - int brw_size = exp_brw_size( - client->cl_import->imp_obd->obd_self_export); + /* Get the current RPC size directly, instead of going via: + * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) + * Keep comment here so that it can be found by searching. */ + int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; if (client->cl_import->imp_state == LUSTRE_IMP_FULL && client->cl_avail_grant > brw_size) @@ -1294,12 +1294,10 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, * retry logic */ req->rq_no_retry_einprogress = 1; - if (opc == OST_WRITE) - desc = ptlrpc_prep_bulk_imp(req, page_count, - BULK_GET_SOURCE, OST_BULK_PORTAL); - else - desc = ptlrpc_prep_bulk_imp(req, page_count, - BULK_PUT_SINK, OST_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, page_count, + cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, + opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, + OST_BULK_PORTAL); if (desc == NULL) GOTO(out, rc = -ENOMEM); @@ -1312,11 +1310,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, lustre_set_wire_obdo(&body->oa, oa); - obdo_to_ioobj(oa, ioobj); - ioobj->ioo_bufcnt = niocount; - osc_pack_capa(req, body, ocapa); - LASSERT (page_count > 0); - pg_prev = pga[0]; + obdo_to_ioobj(oa, ioobj); + ioobj->ioo_bufcnt = niocount; + /* The high bits of ioo_max_brw tells server _maximum_ number of bulks + * that might be send for this request. The actual number is decided + * when the RPC is finally sent in ptlrpc_register_bulk(). It sends + * "max - 1" for old client compatibility sending "0", and also so the + * the actual maximum is a power-of-two number, not one less. LU-1431 */ + ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + osc_pack_capa(req, body, ocapa); + LASSERT(page_count > 0); + pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; int poff = pg->off & ~CFS_PAGE_MASK; @@ -3259,7 +3263,7 @@ static int osc_reconnect(const struct lu_env *env, client_obd_list_lock(&cli->cl_loi_list_lock); data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: - 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); diff --git a/lustre/osd-ldiskfs/osd_io.c b/lustre/osd-ldiskfs/osd_io.c index af8afe1..995a94c 100644 --- a/lustre/osd-ldiskfs/osd_io.c +++ b/lustre/osd-ldiskfs/osd_io.c @@ -352,10 +352,10 @@ static int osd_do_bio(struct osd_device *osd, struct inode *inode, osd_submit_bio(iobuf->dr_rw, bio); } - /* allocate new bio, limited by max BIO size, b=9945 */ - bio = bio_alloc(GFP_NOIO, max(BIO_MAX_PAGES, - (npages - page_idx) * - blocks_per_page)); + /* allocate new bio */ + bio = bio_alloc(GFP_NOIO, min(BIO_MAX_PAGES, + (npages - page_idx) * + blocks_per_page)); if (bio == NULL) { CERROR("Can't allocate bio %u*%u = %u pages\n", (npages - page_idx), blocks_per_page, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 118037f..e5a9617 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -107,25 +107,35 @@ static void ost_drop_id(struct obd_export *exp, struct obdo *oa) * b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX */ static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa, - struct obd_ioobj *ioobj) + struct obd_ioobj *ioobj) { - if (oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP)) { - oa->o_seq = FID_SEQ_OST_MDT0; - if (ioobj) - ioobj->ioo_seq = FID_SEQ_OST_MDT0; - /* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */ - } else if (oa == NULL || - !(fid_seq_is_norm(oa->o_seq) || fid_seq_is_mdt(oa->o_seq) || - fid_seq_is_echo(oa->o_seq))) { - CERROR("%s: client %s sent invalid object "POSTID"\n", - exp->exp_obd->obd_name, obd_export_nid2str(exp), - oa ? oa->o_id : -1, oa ? oa->o_seq : -1); - return -EPROTO; - } - obdo_from_ostid(oa, &oa->o_oi); - if (ioobj) + if (unlikely(oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP))) { + oa->o_seq = FID_SEQ_OST_MDT0; + if (ioobj) + ioobj->ioo_seq = FID_SEQ_OST_MDT0; + } else if (unlikely(oa == NULL || !(fid_seq_is_idif(oa->o_seq) || + fid_seq_is_mdt(oa->o_seq) || + fid_seq_is_echo(oa->o_seq)))) { + CERROR("%s: client %s sent bad object "POSTID": rc = -EPROTO\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp), + oa ? oa->o_id : -1, oa ? oa->o_seq : -1); + return -EPROTO; + } + + obdo_from_ostid(oa, &oa->o_oi); + if (ioobj != NULL) { + unsigned max_brw = ioobj_max_brw_get(ioobj); + + if (unlikely((max_brw & (max_brw - 1)) != 0)) { + CERROR("%s: client %s sent bad ioobj max %u for "POSTID + ": rc = -EPROTO\n", exp->exp_obd->obd_name, + obd_export_nid2str(exp), max_brw, + oa->o_id, oa->o_seq); + return -EPROTO; + } ioobj_from_obdo(ioobj, oa); - return 0; + } + return 0; } void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req) @@ -807,10 +817,10 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (rc != 0) GOTO(out_lock, rc); - desc = ptlrpc_prep_bulk_exp(req, npages, - BULK_PUT_SOURCE, OST_BULK_PORTAL); - if (desc == NULL) - GOTO(out_commitrw, rc = -ENOMEM); + desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), + BULK_PUT_SOURCE, OST_BULK_PORTAL); + if (desc == NULL) + GOTO(out_commitrw, rc = -ENOMEM); nob = 0; for (i = 0; i < npages; i++) { @@ -1097,14 +1107,13 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (rc != 0) GOTO(out_lock, rc); - desc = ptlrpc_prep_bulk_exp(req, npages, - BULK_GET_SINK, OST_BULK_PORTAL); - if (desc == NULL) - GOTO(skip_transfer, rc = -ENOMEM); - - /* NB Having prepped, we must commit... */ + desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), + BULK_GET_SINK, OST_BULK_PORTAL); + if (desc == NULL) + GOTO(skip_transfer, rc = -ENOMEM); - for (i = 0; i < npages; i++) + /* NB Having prepped, we must commit... */ + for (i = 0; i < npages; i++) ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page, local_nb[i].lnb_page_offset, local_nb[i].len); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 819276c..4dea902 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -98,26 +98,34 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) EXPORT_SYMBOL(ptlrpc_uuid_to_connection); /** - * Allocate and initialize new bulk descriptor + * Allocate and initialize new bulk descriptor on the sender. * Returns pointer to the descriptor or NULL on error. */ -struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal) +struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw, + unsigned type, unsigned portal) { - struct ptlrpc_bulk_desc *desc; + struct ptlrpc_bulk_desc *desc; + int i; - OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages])); - if (!desc) - return NULL; + OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[npages])); + if (!desc) + return NULL; spin_lock_init(&desc->bd_lock); - cfs_waitq_init(&desc->bd_waitq); - desc->bd_max_iov = npages; - desc->bd_iov_count = 0; - LNetInvalidateHandle(&desc->bd_md_h); - desc->bd_portal = portal; - desc->bd_type = type; - - return desc; + cfs_waitq_init(&desc->bd_waitq); + desc->bd_max_iov = npages; + desc->bd_iov_count = 0; + desc->bd_portal = portal; + desc->bd_type = type; + desc->bd_md_count = 0; + LASSERT(max_brw > 0); + desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); + /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this + * node. Negotiated ocd_brw_size will always be <= this number. */ + for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++) + LNetInvalidateHandle(&desc->bd_mds[i]); + + return desc; } /** @@ -128,16 +136,17 @@ struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal) * error. */ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, - int npages, int type, int portal) + unsigned npages, unsigned max_brw, + unsigned type, unsigned portal) { - struct obd_import *imp = req->rq_import; - struct ptlrpc_bulk_desc *desc; + struct obd_import *imp = req->rq_import; + struct ptlrpc_bulk_desc *desc; - ENTRY; - LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE); - desc = new_bulk(npages, type, portal); - if (desc == NULL) - RETURN(NULL); + ENTRY; + LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE); + desc = ptlrpc_new_bulk(npages, max_brw, type, portal); + if (desc == NULL) + RETURN(NULL); desc->bd_import_generation = req->rq_import_generation; desc->bd_import = class_import_get(imp); @@ -182,29 +191,29 @@ EXPORT_SYMBOL(__ptlrpc_prep_bulk_page); */ void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin) { - int i; - ENTRY; + int i; + ENTRY; - LASSERT(desc != NULL); - LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */ - LASSERT(!desc->bd_network_rw); /* network hands off or */ - LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL)); + LASSERT(desc != NULL); + LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */ + LASSERT(desc->bd_md_count == 0); /* network hands off */ + LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL)); - sptlrpc_enc_pool_put_pages(desc); + sptlrpc_enc_pool_put_pages(desc); - if (desc->bd_export) - class_export_put(desc->bd_export); - else - class_import_put(desc->bd_import); + if (desc->bd_export) + class_export_put(desc->bd_export); + else + class_import_put(desc->bd_import); if (unpin) { for (i = 0; i < desc->bd_iov_count ; i++) cfs_page_unpin(desc->bd_iov[i].kiov_page); } - OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, - bd_iov[desc->bd_max_iov])); - EXIT; + OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, + bd_iov[desc->bd_max_iov])); + EXIT; } EXPORT_SYMBOL(__ptlrpc_free_bulk); @@ -1752,14 +1761,14 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (ptlrpc_client_bulk_active(req)) continue; - if (!req->rq_bulk->bd_success) { - /* The RPC reply arrived OK, but the bulk screwed - * up! Dead weird since the server told us the RPC - * was good after getting the REPLY for her GET or - * the ACK for her PUT. */ - DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); - req->rq_status = -EIO; - } + if (req->rq_bulk->bd_failure) { + /* The RPC reply arrived OK, but the bulk screwed + * up! Dead weird since the server told us the RPC + * was good after getting the REPLY for her GET or + * the ACK for her PUT. */ + DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); + req->rq_status = -EIO; + } ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); @@ -2856,28 +2865,44 @@ static spinlock_t ptlrpc_last_xid_lock; #define YEAR_2004 (1ULL << 30) void ptlrpc_init_xid(void) { - time_t now = cfs_time_current_sec(); + time_t now = cfs_time_current_sec(); spin_lock_init(&ptlrpc_last_xid_lock); - if (now < YEAR_2004) { - cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid)); - ptlrpc_last_xid >>= 2; - ptlrpc_last_xid |= (1ULL << 61); - } else { - ptlrpc_last_xid = (__u64)now << 20; - } + if (now < YEAR_2004) { + cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid)); + ptlrpc_last_xid >>= 2; + ptlrpc_last_xid |= (1ULL << 61); + } else { + ptlrpc_last_xid = (__u64)now << 20; + } + + /* Need to always be aligned to a power-of-two for mutli-bulk BRW */ + CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0); + ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK; } /** - * Increase xid and returns resultng new value to the caller. + * Increase xid and returns resulting new value to the caller. + * + * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting + * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC + * itself uses the last bulk xid needed, so the server can determine the + * the number of bulk transfers from the RPC XID and a bitmask. The starting + * xid must align to a power-of-two value. + * + * This is assumed to be true due to the initial ptlrpc_last_xid + * value also being initialized to a power-of-two value. LU-1431 */ __u64 ptlrpc_next_xid(void) { - __u64 tmp; + __u64 next; + spin_lock(&ptlrpc_last_xid_lock); - tmp = ++ptlrpc_last_xid; + next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; + ptlrpc_last_xid = next; spin_unlock(&ptlrpc_last_xid_lock); - return tmp; + + return next; } EXPORT_SYMBOL(ptlrpc_next_xid); @@ -2889,14 +2914,16 @@ __u64 ptlrpc_sample_next_xid(void) { #if BITS_PER_LONG == 32 /* need to avoid possible word tearing on 32-bit systems */ - __u64 tmp; + __u64 next; + spin_lock(&ptlrpc_last_xid_lock); - tmp = ptlrpc_last_xid + 1; + next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; spin_unlock(&ptlrpc_last_xid_lock); - return tmp; + + return next; #else /* No need to lock, since returned value is racy anyways */ - return ptlrpc_last_xid + 1; + return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT; #endif } EXPORT_SYMBOL(ptlrpc_sample_next_xid); diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index e2f098e..0dfe3a1 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -201,28 +201,27 @@ void client_bulk_callback (lnet_event_t *ev) ev->type, ev->status, desc); spin_lock(&desc->bd_lock); - req = desc->bd_req; - LASSERT(desc->bd_network_rw); - desc->bd_network_rw = 0; - - if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) { - desc->bd_success = 1; - desc->bd_nob_transferred = ev->mlength; - desc->bd_sender = ev->sender; - } else { - /* start reconnect and resend if network error hit */ + req = desc->bd_req; + LASSERT(desc->bd_md_count > 0); + desc->bd_md_count--; + + if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) { + desc->bd_nob_transferred += ev->mlength; + desc->bd_sender = ev->sender; + } else { + /* start reconnect and resend if network error hit */ spin_lock(&req->rq_lock); req->rq_net_err = 1; spin_unlock(&req->rq_lock); - } + } - /* release the encrypted pages for write */ - if (desc->bd_req->rq_bulk_write) - sptlrpc_enc_pool_put_pages(desc); + if (ev->status != 0) + desc->bd_failure = 1; - /* NB don't unlock till after wakeup; desc can disappear under us - * otherwise */ - ptlrpc_client_wake_req(req); + /* NB don't unlock till after wakeup; desc can disappear under us + * otherwise */ + if (desc->bd_md_count == 0) + ptlrpc_client_wake_req(desc->bd_req); spin_unlock(&desc->bd_lock); EXIT; @@ -435,16 +434,16 @@ void reply_out_callback(lnet_event_t *ev) */ void server_bulk_callback (lnet_event_t *ev) { - struct ptlrpc_cb_id *cbid = ev->md.user_ptr; - struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; - ENTRY; + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; + ENTRY; - LASSERT (ev->type == LNET_EVENT_SEND || - ev->type == LNET_EVENT_UNLINK || - (desc->bd_type == BULK_PUT_SOURCE && - ev->type == LNET_EVENT_ACK) || - (desc->bd_type == BULK_GET_SINK && - ev->type == LNET_EVENT_REPLY)); + LASSERT(ev->type == LNET_EVENT_SEND || + ev->type == LNET_EVENT_UNLINK || + (desc->bd_type == BULK_PUT_SOURCE && + ev->type == LNET_EVENT_ACK) || + (desc->bd_type == BULK_GET_SINK && + ev->type == LNET_EVENT_REPLY)); CDEBUG((ev->status == 0) ? D_NET : D_ERROR, "event type %d, status %d, desc %p\n", @@ -452,22 +451,27 @@ void server_bulk_callback (lnet_event_t *ev) spin_lock(&desc->bd_lock); - if ((ev->type == LNET_EVENT_ACK || - ev->type == LNET_EVENT_REPLY) && - ev->status == 0) { - /* We heard back from the peer, so even if we get this - * before the SENT event (oh yes we can), we know we - * read/wrote the peer buffer and how much... */ - desc->bd_success = 1; - desc->bd_nob_transferred = ev->mlength; - desc->bd_sender = ev->sender; - } + LASSERT(desc->bd_md_count > 0); - if (ev->unlinked) { - /* This is the last callback no matter what... */ - desc->bd_network_rw = 0; - cfs_waitq_signal(&desc->bd_waitq); - } + if ((ev->type == LNET_EVENT_ACK || + ev->type == LNET_EVENT_REPLY) && + ev->status == 0) { + /* We heard back from the peer, so even if we get this + * before the SENT event (oh yes we can), we know we + * read/wrote the peer buffer and how much... */ + desc->bd_nob_transferred += ev->mlength; + desc->bd_sender = ev->sender; + } + + if (ev->status != 0) + desc->bd_failure = 1; + + if (ev->unlinked) { + desc->bd_md_count--; + /* This is the last callback no matter what... */ + if (desc->bd_md_count == 0) + cfs_waitq_signal(&desc->bd_waitq); + } spin_unlock(&desc->bd_lock); EXIT; diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index 77653b7..7eaee96 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -1083,14 +1083,15 @@ finish: * Enforce ADLER for backward compatibility*/ cli->cl_supp_cksum_types = OBD_CKSUM_ADLER; } - cli->cl_cksum_type =cksum_type_select(cli->cl_supp_cksum_types); - - if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) - cli->cl_max_pages_per_rpc = - ocd->ocd_brw_size >> CFS_PAGE_SHIFT; - else if (imp->imp_connect_op == MDS_CONNECT || - imp->imp_connect_op == MGS_CONNECT) - cli->cl_max_pages_per_rpc = 1; + cli->cl_cksum_type =cksum_type_select(cli->cl_supp_cksum_types); + + if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) + cli->cl_max_pages_per_rpc = + min(ocd->ocd_brw_size >> CFS_PAGE_SHIFT, + cli->cl_max_pages_per_rpc); + else if (imp->imp_connect_op == MDS_CONNECT || + imp->imp_connect_op == MGS_CONNECT) + cli->cl_max_pages_per_rpc = 1; /* Reset ns_connect_flags only for initial connect. It might be * changed in while using FS and if we reset it in reconnect diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 68e443d..de23826 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -101,6 +101,14 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len, RETURN (0); } +static void mdunlink_iterate_helper(lnet_handle_md_t *bd_mds, int count) +{ + int i; + + for (i = 0; i < count; i++) + LNetMDUnlink(bd_mds[i]); +} + #ifdef HAVE_SERVER_SUPPORT /** * Prepare bulk descriptor for specified incoming request \a req that @@ -111,17 +119,18 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len, * error. */ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req, - int npages, int type, int portal) + unsigned npages, unsigned max_brw, + unsigned type, unsigned portal) { - struct obd_export *exp = req->rq_export; - struct ptlrpc_bulk_desc *desc; + struct obd_export *exp = req->rq_export; + struct ptlrpc_bulk_desc *desc; - ENTRY; - LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK); + ENTRY; + LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK); - desc = new_bulk(npages, type, portal); - if (desc == NULL) - RETURN(NULL); + desc = ptlrpc_new_bulk(npages, max_brw, type, portal); + if (desc == NULL) + RETURN(NULL); desc->bd_export = class_export_get(exp); desc->bd_req = req; @@ -137,74 +146,103 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req, EXPORT_SYMBOL(ptlrpc_prep_bulk_exp); /** - * Starts bulk transfer for descriptor \a desc + * Starts bulk transfer for descriptor \a desc on the server. * Returns 0 on success or error code. */ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) { - struct ptlrpc_connection *conn = desc->bd_export->exp_connection; - int rc; - int rc2; - lnet_md_t md; - __u64 xid; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_PUT_NET)) - RETURN(0); - - /* NB no locking required until desc is on the network */ - LASSERT (!desc->bd_network_rw); - LASSERT (desc->bd_type == BULK_PUT_SOURCE || - desc->bd_type == BULK_GET_SINK); - desc->bd_success = 0; - - md.user_ptr = &desc->bd_cbid; - md.eq_handle = ptlrpc_eq_h; - md.threshold = 2; /* SENT and ACK/REPLY */ - md.options = PTLRPC_MD_OPTIONS; - ptlrpc_fill_bulk_md(&md, desc); - - LASSERT (desc->bd_cbid.cbid_fn == server_bulk_callback); - LASSERT (desc->bd_cbid.cbid_arg == desc); - - /* NB total length may be 0 for a read past EOF, so we send a 0 - * length bulk, since the client expects a bulk event. */ - - rc = LNetMDBind(md, LNET_UNLINK, &desc->bd_md_h); - if (rc != 0) { - CERROR("LNetMDBind failed: %d\n", rc); - LASSERT (rc == -ENOMEM); - RETURN(-ENOMEM); - } - - /* Client's bulk and reply matchbits are the same */ - xid = desc->bd_req->rq_xid; - CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d " - "id %s xid "LPX64"\n", desc->bd_iov_count, - desc->bd_nob, desc->bd_portal, - libcfs_id2str(conn->c_peer), xid); - - /* Network is about to get at the memory */ - desc->bd_network_rw = 1; - - if (desc->bd_type == BULK_PUT_SOURCE) - rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ, - conn->c_peer, desc->bd_portal, xid, 0, 0); - else - rc = LNetGet (conn->c_self, desc->bd_md_h, - conn->c_peer, desc->bd_portal, xid, 0); - - if (rc != 0) { - /* Can't send, so we unlink the MD bound above. The UNLINK - * event this creates will signal completion with failure, - * so we return SUCCESS here! */ - CERROR("Transfer(%s, %d, "LPX64") failed: %d\n", - libcfs_id2str(conn->c_peer), desc->bd_portal, xid, rc); - rc2 = LNetMDUnlink(desc->bd_md_h); - LASSERT (rc2 == 0); - } - - RETURN(0); + struct obd_export *exp = desc->bd_export; + struct ptlrpc_connection *conn = exp->exp_connection; + int rc = 0; + __u64 xid; + int posted_md; + int total_md; + lnet_md_t md; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_PUT_NET)) + RETURN(0); + + /* NB no locking required until desc is on the network */ + LASSERT(desc->bd_md_count == 0); + LASSERT(desc->bd_type == BULK_PUT_SOURCE || + desc->bd_type == BULK_GET_SINK); + + LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback); + LASSERT(desc->bd_cbid.cbid_arg == desc); + + /* NB total length may be 0 for a read past EOF, so we send 0 + * length bulks, since the client expects bulk events. + * + * The client may not need all of the bulk XIDs for the RPC. The RPC + * used the XID of the highest bulk XID needed, and the server masks + * off high bits to get bulk count for this RPC. LU-1431 */ + xid = desc->bd_req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1); + total_md = desc->bd_req->rq_xid - xid + 1; + + desc->bd_md_count = total_md; + desc->bd_failure = 0; + + md.user_ptr = &desc->bd_cbid; + md.eq_handle = ptlrpc_eq_h; + md.threshold = 2; /* SENT and ACK/REPLY */ + + for (posted_md = 0; posted_md < total_md; xid++) { + md.options = PTLRPC_MD_OPTIONS; + + /* NB it's assumed that source and sink buffer frags are + * page-aligned. Otherwise we'd have to send client bulk + * sizes over and split server buffer accordingly */ + ptlrpc_fill_bulk_md(&md, desc, posted_md); + rc = LNetMDBind(md, LNET_UNLINK, &desc->bd_mds[posted_md]); + if (rc != 0) { + CERROR("%s: LNetMDBind failed for MD %u: rc = %d\n", + exp->exp_obd->obd_name, posted_md, rc); + LASSERT(rc == -ENOMEM); + if (posted_md == 0) { + desc->bd_md_count = 0; + RETURN(-ENOMEM); + } + break; + } + /* Network is about to get at the memory */ + if (desc->bd_type == BULK_PUT_SOURCE) + rc = LNetPut(conn->c_self, desc->bd_mds[posted_md], + LNET_ACK_REQ, conn->c_peer, + desc->bd_portal, xid, 0, 0); + else + rc = LNetGet(conn->c_self, desc->bd_mds[posted_md], + conn->c_peer, desc->bd_portal, xid, 0); + + posted_md++; + if (rc != 0) { + CERROR("%s: failed bulk transfer with %s:%u x"LPU64": " + "rc = %d\n", exp->exp_obd->obd_name, + libcfs_id2str(conn->c_peer), desc->bd_portal, + xid, rc); + break; + } + } + + if (rc != 0) { + /* Can't send, so we unlink the MD bound above. The UNLINK + * event this creates will signal completion with failure, + * so we return SUCCESS here! */ + spin_lock(&desc->bd_lock); + desc->bd_md_count -= total_md - posted_md; + spin_unlock(&desc->bd_lock); + LASSERT(desc->bd_md_count >= 0); + + mdunlink_iterate_helper(desc->bd_mds, posted_md); + RETURN(0); + } + + CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d " + "id %s xid "LPX64"-"LPX64"\n", desc->bd_iov_count, + desc->bd_nob, desc->bd_portal, libcfs_id2str(conn->c_peer), + xid - posted_md, xid - 1); + + RETURN(0); } EXPORT_SYMBOL(ptlrpc_start_bulk_transfer); @@ -231,8 +269,7 @@ void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc) * one. If it fails, it must be because completion just happened, * but we must still l_wait_event() in this case, to give liblustre * a chance to run server_bulk_callback()*/ - - LNetMDUnlink(desc->bd_md_h); + mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_count); for (;;) { /* Network access will complete in finite time but the HUGE @@ -252,84 +289,124 @@ EXPORT_SYMBOL(ptlrpc_abort_bulk); #endif /* HAVE_SERVER_SUPPORT */ /** - * Register bulk for later transfer + * Register bulk at the sender for later transfer. * Returns 0 on success or error code. */ int ptlrpc_register_bulk(struct ptlrpc_request *req) { - struct ptlrpc_bulk_desc *desc = req->rq_bulk; - lnet_process_id_t peer; - int rc; - int rc2; - lnet_handle_me_t me_h; - lnet_md_t md; - ENTRY; + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + lnet_process_id_t peer; + int rc = 0; + int rc2; + int posted_md; + int total_md; + __u64 xid; + lnet_handle_me_t me_h; + lnet_md_t md; + ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_GET_NET)) RETURN(0); - /* NB no locking required until desc is on the network */ - LASSERT (desc->bd_nob > 0); - LASSERT (!desc->bd_network_rw); - LASSERT (desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); - LASSERT (desc->bd_req != NULL); - LASSERT (desc->bd_type == BULK_PUT_SINK || - desc->bd_type == BULK_GET_SOURCE); + /* NB no locking required until desc is on the network */ + LASSERT(desc->bd_nob > 0); + LASSERT(desc->bd_md_count == 0); + LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT); + LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); + LASSERT(desc->bd_req != NULL); + LASSERT(desc->bd_type == BULK_PUT_SINK || + desc->bd_type == BULK_GET_SOURCE); - desc->bd_success = 0; + desc->bd_failure = 0; - peer = desc->bd_import->imp_connection->c_peer; + peer = desc->bd_import->imp_connection->c_peer; - md.user_ptr = &desc->bd_cbid; - md.eq_handle = ptlrpc_eq_h; - md.threshold = 1; /* PUT or GET */ - md.options = PTLRPC_MD_OPTIONS | - ((desc->bd_type == BULK_GET_SOURCE) ? - LNET_MD_OP_GET : LNET_MD_OP_PUT); - ptlrpc_fill_bulk_md(&md, desc); - - LASSERT (desc->bd_cbid.cbid_fn == client_bulk_callback); - LASSERT (desc->bd_cbid.cbid_arg == desc); - - /* XXX Registering the same xid on retried bulk makes my head - * explode trying to understand how the original request's bulk - * might interfere with the retried request -eeb - * On the other hand replaying with the same xid is fine, since - * we are guaranteed old request have completed. -green */ - LASSERTF(!(desc->bd_registered && - req->rq_send_state != LUSTRE_IMP_REPLAY) || - req->rq_xid != desc->bd_last_xid, - "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n", - desc->bd_registered, req->rq_xid, desc->bd_last_xid); - desc->bd_registered = 1; - desc->bd_last_xid = req->rq_xid; - - rc = LNetMEAttach(desc->bd_portal, peer, - req->rq_xid, 0, LNET_UNLINK, LNET_INS_AFTER, &me_h); - if (rc != 0) { - CERROR("LNetMEAttach failed: %d\n", rc); - LASSERT (rc == -ENOMEM); - RETURN (-ENOMEM); - } + LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback); + LASSERT(desc->bd_cbid.cbid_arg == desc); - /* About to let the network at it... */ - desc->bd_network_rw = 1; - rc = LNetMDAttach(me_h, md, LNET_UNLINK, &desc->bd_md_h); - if (rc != 0) { - CERROR("LNetMDAttach failed: %d\n", rc); - LASSERT (rc == -ENOMEM); - desc->bd_network_rw = 0; - rc2 = LNetMEUnlink (me_h); - LASSERT (rc2 == 0); - RETURN (-ENOMEM); - } + /* An XID is only used for a single request from the client. + * For retried bulk transfers, a new XID will be allocated in + * in ptlrpc_check_set() if it needs to be resent, so it is not + * using the same RDMA match bits after an error. + * + * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The + * first bulk XID is power-of-two aligned before rq_xid. LU-1431 */ + xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1); + LASSERTF(!(desc->bd_registered && + req->rq_send_state != LUSTRE_IMP_REPLAY) || + xid != desc->bd_last_xid, + "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n", + desc->bd_registered, xid, desc->bd_last_xid); + + total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV; + desc->bd_registered = 1; + desc->bd_last_xid = xid; + desc->bd_md_count = total_md; + md.user_ptr = &desc->bd_cbid; + md.eq_handle = ptlrpc_eq_h; + md.threshold = 1; /* PUT or GET */ + + for (posted_md = 0; posted_md < total_md; posted_md++, xid++) { + md.options = PTLRPC_MD_OPTIONS | + ((desc->bd_type == BULK_GET_SOURCE) ? + LNET_MD_OP_GET : LNET_MD_OP_PUT); + ptlrpc_fill_bulk_md(&md, desc, posted_md); + + rc = LNetMEAttach(desc->bd_portal, peer, xid, 0, + LNET_UNLINK, LNET_INS_AFTER, &me_h); + if (rc != 0) { + CERROR("%s: LNetMEAttach failed x"LPU64"/%d: rc = %d\n", + desc->bd_export->exp_obd->obd_name, xid, + posted_md, rc); + break; + } - CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPU64", " - "portal %u\n", - desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink", - desc->bd_iov_count, desc->bd_nob, - req->rq_xid, desc->bd_portal); - RETURN(0); + /* About to let the network at it... */ + rc = LNetMDAttach(me_h, md, LNET_UNLINK, + &desc->bd_mds[posted_md]); + if (rc != 0) { + CERROR("%s: LNetMDAttach failed x"LPU64"/%d: rc = %d\n", + desc->bd_export->exp_obd->obd_name, xid, + posted_md, rc); + rc2 = LNetMEUnlink(me_h); + LASSERT(rc2 == 0); + break; + } + } + + if (rc != 0) { + LASSERT(rc == -ENOMEM); + spin_lock(&desc->bd_lock); + desc->bd_md_count -= total_md - posted_md; + spin_unlock(&desc->bd_lock); + LASSERT(desc->bd_md_count >= 0); + mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw); + req->rq_status = -ENOMEM; + RETURN(-ENOMEM); + } + + /* Set rq_xid to matchbits of the final bulk so that server can + * infer the number of bulks that were prepared */ + req->rq_xid = --xid; + LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK), + "bd_last_xid = x"LPU64", rq_xid = x"LPU64"\n", + desc->bd_last_xid, req->rq_xid); + + spin_lock(&desc->bd_lock); + /* Holler if peer manages to touch buffers before he knows the xid */ + if (desc->bd_md_count != total_md) + CWARN("%s: Peer %s touched %d buffers while I registered\n", + desc->bd_export->exp_obd->obd_name, libcfs_id2str(peer), + total_md - desc->bd_md_count); + spin_unlock(&desc->bd_lock); + + CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, " + "xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count, + desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink", + desc->bd_iov_count, desc->bd_nob, + desc->bd_last_xid, req->rq_xid, desc->bd_portal); + + RETURN(0); } EXPORT_SYMBOL(ptlrpc_register_bulk); @@ -354,20 +431,19 @@ int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async) async && req->rq_bulk_deadline == 0) req->rq_bulk_deadline = cfs_time_current_sec() + LONG_UNLINK; - if (!ptlrpc_client_bulk_active(req)) /* completed or */ - RETURN(1); /* never registered */ - - LASSERT(desc->bd_req == req); /* bd_req NULL until registered */ + if (ptlrpc_client_bulk_active(req) == 0) /* completed or */ + RETURN(1); /* never registered */ - /* the unlink ensures the callback happens ASAP and is the last - * one. If it fails, it must be because completion just happened, - * but we must still l_wait_event() in this case to give liblustre - * a chance to run client_bulk_callback() */ + LASSERT(desc->bd_req == req); /* bd_req NULL until registered */ - LNetMDUnlink(desc->bd_md_h); + /* the unlink ensures the callback happens ASAP and is the last + * one. If it fails, it must be because completion just happened, + * but we must still l_wait_event() in this case to give liblustre + * a chance to run client_bulk_callback() */ + mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw); - if (!ptlrpc_client_bulk_active(req)) /* completed or */ - RETURN(1); /* never registered */ + if (ptlrpc_client_bulk_active(req) == 0) /* completed or */ + RETURN(1); /* never registered */ /* Move to "Unregistering" phase as bulk was not unlinked yet. */ ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING); diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 7c8c6ba..b8e99fd 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1786,12 +1786,12 @@ void lustre_swab_obd_statfs (struct obd_statfs *os) } EXPORT_SYMBOL(lustre_swab_obd_statfs); -void lustre_swab_obd_ioobj (struct obd_ioobj *ioo) +void lustre_swab_obd_ioobj(struct obd_ioobj *ioo) { - __swab64s (&ioo->ioo_id); - __swab64s (&ioo->ioo_seq); - __swab32s (&ioo->ioo_type); - __swab32s (&ioo->ioo_bufcnt); + __swab64s(&ioo->ioo_id); + __swab64s(&ioo->ioo_seq); + __swab32s(&ioo->ioo_max_brw); + __swab32s(&ioo->ioo_bufcnt); } EXPORT_SYMBOL(lustre_swab_obd_ioobj); @@ -2311,10 +2311,10 @@ void lustre_swab_quota_body(struct quota_body *b) /* Dump functions */ void dump_ioo(struct obd_ioobj *ioo) { - CDEBUG(D_RPCTRACE, - "obd_ioobj: ioo_id="LPD64", ioo_seq="LPD64", ioo_type=%d, " - "ioo_bufct=%d\n", ioo->ioo_id, ioo->ioo_seq, ioo->ioo_type, - ioo->ioo_bufcnt); + CDEBUG(D_RPCTRACE, + "obd_ioobj: ioo_id="LPD64", ioo_seq="LPD64", ioo_max_brw=%#x, " + "ioo_bufct=%d\n", ioo->ioo_id, ioo->ioo_seq, ioo->ioo_max_brw, + ioo->ioo_bufcnt); } EXPORT_SYMBOL(dump_ioo); diff --git a/lustre/ptlrpc/pers.c b/lustre/ptlrpc/pers.c index 5c78e12..2001477 100644 --- a/lustre/ptlrpc/pers.c +++ b/lustre/ptlrpc/pers.c @@ -49,17 +49,23 @@ #ifdef __KERNEL__ -void ptlrpc_fill_bulk_md (lnet_md_t *md, struct ptlrpc_bulk_desc *desc) +void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc, + int mdidx) { - LASSERT (desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); - LASSERT (!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS))); - - md->options |= LNET_MD_KIOV; - md->length = desc->bd_iov_count; - if (desc->bd_enc_iov) - md->start = desc->bd_enc_iov; - else - md->start = desc->bd_iov; + CLASSERT(PTLRPC_MAX_BRW_PAGES < LI_POISON); + + LASSERT(mdidx < desc->bd_md_max_brw); + LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); + LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | + LNET_MD_PHYS))); + + md->options |= LNET_MD_KIOV; + md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV); + md->length = min_t(unsigned int, LNET_MAX_IOV, md->length); + if (desc->bd_enc_iov) + md->start = &desc->bd_enc_iov[mdidx * LNET_MAX_IOV]; + else + md->start = &desc->bd_iov[mdidx * LNET_MAX_IOV]; } void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page, @@ -76,18 +82,23 @@ void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page, #else /* !__KERNEL__ */ -void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc) +void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc, + int mdidx) { - LASSERT (!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS))); - if (desc->bd_iov_count == 1) { - md->start = desc->bd_iov[0].iov_base; - md->length = desc->bd_iov[0].iov_len; - return; - } - - md->options |= LNET_MD_IOVEC; - md->start = &desc->bd_iov[0]; - md->length = desc->bd_iov_count; + LASSERT(mdidx < desc->bd_md_max_brw); + LASSERT(desc->bd_iov_count > mdidx * LNET_MAX_IOV); + LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS))); + + if (desc->bd_iov_count == 1) { + md->start = desc->bd_iov[0].iov_base; + md->length = desc->bd_iov[0].iov_len; + return; + } + + md->options |= LNET_MD_IOVEC; + md->start = &desc->bd_iov[mdidx * LNET_MAX_IOV]; + md->length = min(LNET_MAX_IOV, desc->bd_iov_count - mdidx * + LNET_MAX_IOV); } static int can_merge_iovs(lnet_md_iovec_t *existing, lnet_md_iovec_t *candidate) diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 606e333..c2d5050 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -53,7 +53,8 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait); int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc); /* client.c */ -struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal); +struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw, + unsigned type, unsigned portal); void ptlrpc_init_xid(void); /* events.c */ @@ -209,9 +210,10 @@ nrs_request_policy(struct ptlrpc_nrs_request *nrq) int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink); /* pers.c */ -void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc); +void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc, + int mdcnt); void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page, - int pageoffset, int len); + int pageoffset, int len); /* pack_generic.c */ struct ptlrpc_reply_state * diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 35cb38e..0a9f6e1 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -1594,10 +1594,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct obd_ioobj, ioo_oid.oi_seq)); LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq) == 8, "found %lld\n", (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq)); - LASSERTF((int)offsetof(struct obd_ioobj, ioo_type) == 16, "found %lld\n", - (long long)(int)offsetof(struct obd_ioobj, ioo_type)); - LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_type) == 4, "found %lld\n", - (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_type)); + LASSERTF((int)offsetof(struct obd_ioobj, ioo_max_brw) == 16, "found %lld\n", + (long long)(int)offsetof(struct obd_ioobj, ioo_max_brw)); + LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw) == 4, "found %lld\n", + (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw)); LASSERTF((int)offsetof(struct obd_ioobj, ioo_bufcnt) == 20, "found %lld\n", (long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt)); LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n", diff --git a/lustre/quota/qsd_request.c b/lustre/quota/qsd_request.c index 5e46bfc..be40e03 100644 --- a/lustre/quota/qsd_request.c +++ b/lustre/quota/qsd_request.c @@ -374,7 +374,7 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp, ptlrpc_at_set_req_timeout(req); /* allocate bulk descriptor */ - desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK, + desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, MDS_BULK_PORTAL); if (desc == NULL) { ptlrpc_request_free(req); diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 1ebc64d..fe49708 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -10653,6 +10653,56 @@ test_230b() { } run_test 230b "nested remote directory should be failed" +test_231a() +{ + # For simplicity this test assumes that max_pages_per_rpc + # is the same across all OSCs + local max_pages=$($LCTL get_param -n osc.*.max_pages_per_rpc | head -1) + local bulk_size=$((max_pages * 4096)) + + mkdir -p $DIR/$tdir + + # clear the OSC stats + $LCTL set_param osc.*.stats=0 &>/dev/null + + # Client writes $bulk_size - there must be 1 rpc for $max_pages. + dd if=/dev/zero of=$DIR/$tdir/$tfile bs=$bulk_size count=1 \ + oflag=direct &>/dev/null || error "dd failed" + + local nrpcs=$($LCTL get_param osc.*.stats |awk '/ost_write/ {print $2}') + if [ x$nrpcs != "x1" ]; then + error "found $nrpc ost_write RPCs, not 1 as expected" + fi + + # Drop the OSC cache, otherwise we will read from it + cancel_lru_locks osc + + # clear the OSC stats + $LCTL set_param osc.*.stats=0 &>/dev/null + + # Client reads $bulk_size. + dd if=$DIR/$tdir/$tfile of=/dev/null bs=$bulk_size count=1 \ + iflag=direct &>/dev/null || error "dd failed" + + nrpcs=$($LCTL get_param osc.*.stats | awk '/ost_read/ { print $2 }') + if [ x$nrpcs != "x1" ]; then + error "found $nrpc ost_read RPCs, not 1 as expected" + fi +} +run_test 231a "checking that reading/writing of BRW RPC size results in one RPC" + +test_231b() { + mkdir -p $DIR/$tdir + local i + for i in {0..1023}; do + dd if=/dev/zero of=$DIR/$tdir/$tfile conv=notrunc \ + seek=$((2 * i)) bs=4096 count=1 &>/dev/null || + error "dd of=$DIR/$tdir/$tfile seek=$((2 * i)) failed" + done + sync +} +run_test 231b "must not assert on fully utilized OST request buffer" + # # tests that do cleanup/setup should be run at the end # diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 01689a5..66d3cf3 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -716,7 +716,7 @@ check_obd_ioobj(void) CHECK_STRUCT(obd_ioobj); CHECK_MEMBER(obd_ioobj, ioo_id); CHECK_MEMBER(obd_ioobj, ioo_seq); - CHECK_MEMBER(obd_ioobj, ioo_type); + CHECK_MEMBER(obd_ioobj, ioo_max_brw); CHECK_MEMBER(obd_ioobj, ioo_bufcnt); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 3a3f20e..561b7a7 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -1602,10 +1602,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct obd_ioobj, ioo_oid.oi_seq)); LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq) == 8, "found %lld\n", (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_oid.oi_seq)); - LASSERTF((int)offsetof(struct obd_ioobj, ioo_type) == 16, "found %lld\n", - (long long)(int)offsetof(struct obd_ioobj, ioo_type)); - LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_type) == 4, "found %lld\n", - (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_type)); + LASSERTF((int)offsetof(struct obd_ioobj, ioo_max_brw) == 16, "found %lld\n", + (long long)(int)offsetof(struct obd_ioobj, ioo_max_brw)); + LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw) == 4, "found %lld\n", + (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_max_brw)); LASSERTF((int)offsetof(struct obd_ioobj, ioo_bufcnt) == 20, "found %lld\n", (long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt)); LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n", -- 1.8.3.1