Details : Changes the blocksize for regular files to be 2x RPC size,
and not depend on stripe size.
-
+Severity : enhancement
+Bugzilla : 9293
+Description: Multiple MD RPCs in flight.
+Details : Further unserialise some read-only MDS RPCs - learn about intents.
+ To avoid overly-overloading MDS, introduce a limit on number of
+ MDS RPCs in flight for a single client and add /proc controls
+ to adjust this limit.
+
------------------------------------------------------------------------------
#define OSC_MAX_DIRTY_DEFAULT (OSC_MAX_RIF_DEFAULT * 4)
#define OSC_MAX_DIRTY_MB_MAX 2048 /* totally arbitrary */
+#define MDC_MAX_RIF_DEFAULT 8
+#define MDC_MAX_RIF_MAX 512
+
struct mdc_rpc_lock;
struct obd_import;
struct client_obd {
spin_lock_init(&cli->cl_read_offset_hist.oh_lock);
spin_lock_init(&cli->cl_write_offset_hist.oh_lock);
cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
- if (num_physpages >> (20 - PAGE_SHIFT) <= 128 /* MB */) {
+ if (!strcmp(name, LUSTRE_MDC_NAME)) {
+ cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT;
+ } else if (num_physpages >> (20 - PAGE_SHIFT) <= 128 /* MB */) {
cli->cl_max_rpcs_in_flight = 2;
} else if (num_physpages >> (20 - PAGE_SHIFT) <= 256 /* MB */) {
cli->cl_max_rpcs_in_flight = 3;
#include <lprocfs_status.h>
#ifdef LPROCFS
+
+static int mdc_rd_max_rpcs_in_flight(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *dev = data;
+ struct client_obd *cli = &dev->u.cli;
+ int rc;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ rc = snprintf(page, count, "%u\n", cli->cl_max_rpcs_in_flight);
+ spin_unlock(&cli->cl_loi_list_lock);
+ return rc;
+}
+
+static int mdc_wr_max_rpcs_in_flight(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *dev = data;
+ struct client_obd *cli = &dev->u.cli;
+ int val, rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ if (val < 1 || val > MDC_MAX_RIF_MAX)
+ return -ERANGE;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_max_rpcs_in_flight = val;
+ spin_unlock(&cli->cl_loi_list_lock);
+
+ return count;
+}
static struct lprocfs_vars lprocfs_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "ping", 0, lprocfs_wr_ping, 0 },
//{ "filegroups", lprocfs_rd_filegroups, 0, 0 },
{ "mds_server_uuid", lprocfs_rd_server_uuid, 0, 0 },
{ "mds_conn_uuid", lprocfs_rd_conn_uuid, 0, 0 },
+ { "max_rpcs_in_flight", mdc_rd_max_rpcs_in_flight,
+ mdc_wr_max_rpcs_in_flight, 0 },
{ 0 }
};
const char *old, int oldlen, const char *new, int newlen);
void mdc_close_pack(struct ptlrpc_request *req, int offset, struct obdo *oa,
int valid, struct obd_client_handle *och);
+void mdc_exit_request(struct client_obd *cli);
+void mdc_enter_request(struct client_obd *cli);
struct mdc_open_data {
struct obd_client_handle *mod_och;
struct lookup_intent *it)
{
ENTRY;
- if (1 || !it || (it->it_op != IT_GETATTR && it->it_op != IT_LOOKUP)) {
+ if (!it || (it->it_op != IT_GETATTR && it->it_op != IT_LOOKUP)) {
down(&lck->rpcl_sem);
LASSERT(lck->rpcl_it == NULL);
lck->rpcl_it = it;
static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck,
struct lookup_intent *it)
{
- if (1 || !it || (it->it_op != IT_GETATTR && it->it_op != IT_LOOKUP)) {
+ if (!it || (it->it_op != IT_GETATTR && it->it_op != IT_LOOKUP)) {
LASSERT(it == lck->rpcl_it);
lck->rpcl_it = NULL;
up(&lck->rpcl_sem);
body->valid |= OBD_MD_FLFLAGS;
}
}
+
+struct mdc_cache_waiter {
+ struct list_head mcw_entry;
+ wait_queue_head_t mcw_waitq;
+};
+
+static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
+{
+ int rc;
+ ENTRY;
+ spin_lock(&cli->cl_loi_list_lock);
+ rc = list_empty(&mcw->mcw_entry);
+ spin_unlock(&cli->cl_loi_list_lock);
+ RETURN(rc);
+};
+
+/* We record requests in flight in cli->cl_r_in_flight here.
+ * There is only one write rpc possible in mdc anyway. If this to change
+ * in the future - the code may need to be revisited. */
+void mdc_enter_request(struct client_obd *cli)
+{
+ struct mdc_cache_waiter mcw;
+ struct l_wait_info lwi = { 0 };
+
+ spin_lock(&cli->cl_loi_list_lock);
+ if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
+ list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
+ init_waitqueue_head(&mcw.mcw_waitq);
+ spin_unlock(&cli->cl_loi_list_lock);
+ l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw), &lwi);
+ } else {
+ cli->cl_r_in_flight++;
+ spin_unlock(&cli->cl_loi_list_lock);
+ }
+}
+
+void mdc_exit_request(struct client_obd *cli)
+{
+ struct list_head *l, *tmp;
+ struct mdc_cache_waiter *mcw;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_r_in_flight--;
+ list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+
+ if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
+ /* No free request slots anymore */
+ break;
+ }
+
+ mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry);
+ list_del_init(&mcw->mcw_entry);
+ cli->cl_r_in_flight++;
+ wake_up(&mcw->mcw_waitq);
+ }
+ /* Empty waiting list? Decrease reqs in-flight number */
+
+ spin_unlock(&cli->cl_loi_list_lock);
+}
/* get ready for the reply */
ptlrpc_req_set_repsize(req, repbufcnt, repsize);
+ /* It is important to obtain rpc_lock first (if applicable), so that
+ * threads that are serialised with rpc_lock are not polluting our
+ * rpcs in flight counter */
mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+ mdc_enter_request(&obddev->u.cli);
rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy,
lock_mode, &flags, cb_blocking, cb_completion,
NULL, cb_data, NULL, 0, NULL, lockh, 0);
+ mdc_exit_request(&obddev->u.cli);
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
/* Similarly, if we're going to replay this request, we don't want to