#include <lustre_mdc.h>
#include "fld_internal.h"
-/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c
- * It should be common thing. The same about mdc RPC lock */
-static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
-{
- int rc;
- ENTRY;
- client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = cfs_list_empty(&mcw->mcw_entry);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
-};
-
-static void fld_enter_request(struct client_obd *cli)
-{
- struct mdc_cache_waiter mcw;
- struct l_wait_info lwi = { 0 };
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
- cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
- init_waitqueue_head(&mcw.mcw_waitq);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi);
- } else {
- cli->cl_r_in_flight++;
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- }
-}
-
-static void fld_exit_request(struct client_obd *cli)
-{
- cfs_list_t *l, *tmp;
- struct mdc_cache_waiter *mcw;
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cli->cl_r_in_flight--;
- cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-
- if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
- /* No free request slots anymore */
- break;
- }
-
- mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry);
- cfs_list_del_init(&mcw->mcw_entry);
- cli->cl_r_in_flight++;
- wake_up(&mcw->mcw_waitq);
- }
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-}
-
static int fld_rrb_hash(struct lu_client_fld *fld,
seqno_t seq)
{
req->rq_reply_portal = MDC_REPLY_PORTAL;
ptlrpc_at_set_req_timeout(req);
- fld_enter_request(&exp->exp_obd->u.cli);
+ obd_get_request_slot(&exp->exp_obd->u.cli);
rc = ptlrpc_queue_wait(req);
- fld_exit_request(&exp->exp_obd->u.cli);
+ obd_put_request_slot(&exp->exp_obd->u.cli);
if (rc != 0) {
if (rc == -EWOULDBLOCK) {
/* For no_delay req(see above), EWOULDBLOCK means the
}
-struct mdc_cache_waiter {
- cfs_list_t mcw_entry;
- wait_queue_head_t mcw_waitq;
-};
-
/* mdc/mdc_locks.c */
int it_disposition(const struct lookup_intent *it, int flag);
void it_clear_disposition(struct lookup_intent *it, int flag);
cfs_list_t ti_chain;
};
-#define OSC_MAX_RIF_DEFAULT 8
-#define MDS_OSC_MAX_RIF_DEFAULT 50
-#define OSC_MAX_RIF_MAX 256
-#define OSC_MAX_DIRTY_DEFAULT (OSC_MAX_RIF_DEFAULT * 4)
-#define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */
-#define OSC_DEFAULT_RESENDS 10
+#define OBD_MAX_RIF_DEFAULT 8
+#define OBD_MAX_RIF_MAX 512
+#define OSC_MAX_RIF_MAX 256
+#define OSC_MAX_DIRTY_DEFAULT (OBD_MAX_RIF_DEFAULT * 4)
+#define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */
+#define OSC_DEFAULT_RESENDS 10
/* possible values for fo_sync_lock_cancel */
enum {
NUM_SYNC_ON_CANCEL_STATES
};
-#define MDC_MAX_RIF_DEFAULT 8
-#define MDC_MAX_RIF_MAX 512
-
struct mdc_rpc_lock;
struct obd_import;
struct client_obd {
int kuc_ispayload(void *p);
void *kuc_alloc(int payload_len, int transport, int type);
void kuc_free(void *p, int payload_len);
+int obd_get_request_slot(struct client_obd *cli);
+void obd_put_request_slot(struct client_obd *cli);
+__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli);
+int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max);
struct llog_handle;
struct llog_rec_hdr;
cli->cl_chunkbits = PAGE_CACHE_SHIFT;
if (!strcmp(name, LUSTRE_MDC_NAME)) {
- cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT;
+ cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
} else if (totalram_pages >> (20 - PAGE_CACHE_SHIFT) <= 128 /* MB */) {
cli->cl_max_rpcs_in_flight = 2;
} else if (totalram_pages >> (20 - PAGE_CACHE_SHIFT) <= 256 /* MB */) {
cli->cl_max_rpcs_in_flight = 4;
} else {
if (osc_on_mdt(obddev->obd_name))
- cli->cl_max_rpcs_in_flight = MDS_OSC_MAX_RIF_DEFAULT;
+ cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_MAX;
else
- cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
+ cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
}
rc = ldlm_get_ref();
if (rc) {
static int mdc_max_rpcs_in_flight_seq_show(struct seq_file *m, void *v)
{
struct obd_device *dev = m->private;
- struct client_obd *cli = &dev->u.cli;
+ __u32 max;
int rc;
- client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = seq_printf(m, "%u\n", cli->cl_max_rpcs_in_flight);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
+ max = obd_get_max_rpcs_in_flight(&dev->u.cli);
+ rc = seq_printf(m, "%u\n", max);
+
return rc;
}
loff_t *off)
{
struct obd_device *dev = ((struct seq_file *)file->private_data)->private;
- struct client_obd *cli = &dev->u.cli;
- int val, rc;
-
- rc = lprocfs_write_helper(buffer, count, &val);
- if (rc)
- return rc;
+ int val;
+ int rc;
- if (val < 1 || val > MDC_MAX_RIF_MAX)
- return -ERANGE;
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc == 0)
+ rc = obd_set_max_rpcs_in_flight(&dev->u.cli, val);
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cli->cl_max_rpcs_in_flight = val;
- client_obd_list_unlock(&cli->cl_loi_list_lock);
+ if (rc != 0)
+ count = rc;
- return count;
+ return count;
}
LPROC_SEQ_FOPS(mdc_max_rpcs_in_flight);
void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
const char *old, int oldlen, const char *new, int newlen);
void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
-int mdc_enter_request(struct client_obd *cli);
-void mdc_exit_request(struct client_obd *cli);
/* mdc/mdc_locks.c */
int mdc_set_lock_data(struct obd_export *exp,
mdc_ioepoch_pack(epoch, op_data);
mdc_hsm_release_pack(req, op_data);
}
-
-static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
-{
- int rc;
- ENTRY;
- client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = cfs_list_empty(&mcw->mcw_entry);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
-};
-
-/* We record requests in flight in cli->cl_r_in_flight here.
- * There is only one write rpc possible in mdc anyway. If this to change
- * in the future - the code may need to be revisited. */
-int mdc_enter_request(struct client_obd *cli)
-{
- int rc = 0;
- struct mdc_cache_waiter mcw;
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
- cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
- init_waitqueue_head(&mcw.mcw_waitq);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw), &lwi);
- if (rc) {
- client_obd_list_lock(&cli->cl_loi_list_lock);
- if (cfs_list_empty(&mcw.mcw_entry))
- cli->cl_r_in_flight--;
- cfs_list_del_init(&mcw.mcw_entry);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- }
- } else {
- cli->cl_r_in_flight++;
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- }
- return rc;
-}
-
-void mdc_exit_request(struct client_obd *cli)
-{
- cfs_list_t *l, *tmp;
- struct mdc_cache_waiter *mcw;
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cli->cl_r_in_flight--;
- cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
- if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
- /* No free request slots anymore */
- break;
- }
-
- mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry);
- cfs_list_del_init(&mcw->mcw_entry);
- cli->cl_r_in_flight++;
- wake_up(&mcw->mcw_waitq);
- }
- /* Empty waiting list? Decrease reqs in-flight number */
-
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-}
* rpcs in flight counter. We do not do flock request limiting, though*/
if (it) {
mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
- rc = mdc_enter_request(&obddev->u.cli);
+ rc = obd_get_request_slot(&obddev->u.cli);
if (rc != 0) {
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
mdc_clear_replay_flag(req, 0);
RETURN(rc);
}
- mdc_exit_request(&obddev->u.cli);
+ obd_put_request_slot(&obddev->u.cli);
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
if (rc < 0) {
obddev = class_exp2obd(exp);
- mdc_exit_request(&obddev->u.cli);
+ obd_put_request_slot(&obddev->u.cli);
if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
rc = -ETIMEDOUT;
if (IS_ERR(req))
RETURN(PTR_ERR(req));
- rc = mdc_enter_request(&obddev->u.cli);
+ rc = obd_get_request_slot(&obddev->u.cli);
if (rc != 0) {
ptlrpc_req_finished(req);
RETURN(rc);
rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
0, LVB_T_NONE, &minfo->mi_lockh, 1);
if (rc < 0) {
- mdc_exit_request(&obddev->u.cli);
+ obd_put_request_slot(&obddev->u.cli);
ptlrpc_req_finished(req);
RETURN(rc);
}
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
int rc;
- /* mdc_enter_request() ensures that this client has no more
+ /* obd_get_request_slot() ensures that this client has no more
* than cl_max_rpcs_in_flight RPCs simultaneously inf light
* against an MDT. */
- rc = mdc_enter_request(cli);
+ rc = obd_get_request_slot(cli);
if (rc != 0)
return rc;
rc = ptlrpc_queue_wait(req);
- mdc_exit_request(cli);
+ obd_put_request_slot(cli);
return rc;
}
}
EXPORT_SYMBOL(kuc_free);
+struct obd_request_slot_waiter {
+ struct list_head orsw_entry;
+ wait_queue_head_t orsw_waitq;
+ bool orsw_signaled;
+};
+
+static bool obd_request_slot_avail(struct client_obd *cli,
+ struct obd_request_slot_waiter *orsw)
+{
+ bool avail;
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ avail = !!list_empty(&orsw->orsw_entry);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ return avail;
+};
+/*
+ * For network flow control, the RPC sponsor needs to acquire a credit
+ * before sending the RPC. The credits count for a connection is defined
+ * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
+ * the subsequent RPC sponsors need to wait until others released their
+ * credits, or the administrator increased the "cl_max_rpcs_in_flight".
+ */
+int obd_get_request_slot(struct client_obd *cli)
+{
+ struct obd_request_slot_waiter orsw;
+ struct l_wait_info lwi;
+ int rc;
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
+ cli->cl_r_in_flight++;
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ return 0;
+ }
+
+ init_waitqueue_head(&orsw.orsw_waitq);
+ list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
+ orsw.orsw_signaled = false;
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ rc = l_wait_event(orsw.orsw_waitq,
+ obd_request_slot_avail(cli, &orsw) ||
+ orsw.orsw_signaled,
+ &lwi);
+
+ /* Here, we must take the lock to avoid the on-stack 'orsw' to be
+ * freed but other (such as obd_put_request_slot) is using it. */
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ if (rc != 0) {
+ if (!orsw.orsw_signaled) {
+ if (list_empty(&orsw.orsw_entry))
+ cli->cl_r_in_flight--;
+ else
+ list_del(&orsw.orsw_entry);
+ }
+ }
+
+ if (orsw.orsw_signaled) {
+ LASSERT(list_empty(&orsw.orsw_entry));
+
+ rc = -EINTR;
+ }
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ return rc;
+}
+EXPORT_SYMBOL(obd_get_request_slot);
+
+void obd_put_request_slot(struct client_obd *cli)
+{
+ struct obd_request_slot_waiter *orsw;
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ cli->cl_r_in_flight--;
+
+ /* If there is free slot, wakeup the first waiter. */
+ if (!list_empty(&cli->cl_loi_read_list) &&
+ likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
+ orsw = list_entry(cli->cl_loi_read_list.next,
+ struct obd_request_slot_waiter, orsw_entry);
+ list_del_init(&orsw->orsw_entry);
+ cli->cl_r_in_flight++;
+ wake_up(&orsw->orsw_waitq);
+ }
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+}
+EXPORT_SYMBOL(obd_put_request_slot);
+
+__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
+{
+ return cli->cl_max_rpcs_in_flight;
+}
+EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
+
+int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
+{
+ struct obd_request_slot_waiter *orsw;
+ __u32 old;
+ int diff;
+ int i;
+
+ if (max > OBD_MAX_RIF_MAX || max < 1)
+ return -ERANGE;
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ old = cli->cl_max_rpcs_in_flight;
+ cli->cl_max_rpcs_in_flight = max;
+ diff = max - old;
+
+ /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
+ for (i = 0; i < diff; i++) {
+ if (list_empty(&cli->cl_loi_read_list))
+ break;
+
+ orsw = list_entry(cli->cl_loi_read_list.next,
+ struct obd_request_slot_waiter, orsw_entry);
+ list_del_init(&orsw->orsw_entry);
+ cli->cl_r_in_flight++;
+ wake_up(&orsw->orsw_waitq);
+ }
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ return 0;
+}
+EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
return rc;
}
+static int osp_rd_lfsck_max_rpcs_in_flight(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *dev = data;
+ __u32 max;
+ int rc;
+
+ *eof = 1;
+ max = obd_get_max_rpcs_in_flight(&dev->u.cli);
+ rc = snprintf(page, count, "%u\n", max);
+
+ return rc;
+}
+
+static int osp_wr_lfsck_max_rpcs_in_flight(struct file *file,
+ const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *dev = data;
+ int val;
+ int rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc == 0)
+ rc = obd_set_max_rpcs_in_flight(&dev->u.cli, val);
+
+ if (rc != 0)
+ count = rc;
+
+ return count;
+}
+
static struct lprocfs_vars lprocfs_osp_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "ping", 0, lprocfs_wr_ping, 0, 0, 0222 },
/* for compatibility reasons */
{ "destroys_in_flight", osp_rd_destroys_in_flight, 0, 0 },
+ { "lfsck_max_rpcs_in_flight", osp_rd_lfsck_max_rpcs_in_flight,
+ osp_wr_lfsck_max_rpcs_in_flight, 0 },
{ 0 }
};
RETURN(rc);
}
-#define OSP_MAX_AUIF_MAX 512
-
static int osp_init0(const struct lu_env *env, struct osp_device *m,
struct lu_device_type *ldt, struct lustre_cfg *cfg)
{
ENTRY;
mutex_init(&m->opd_async_requests_mutex);
- /* We allow OSP_MAX_AUIF_MAX async updates in flight at most. */
- sema_init(&m->opd_async_fc_sem, OSP_MAX_AUIF_MAX);
obd = class_name2obd(lustre_cfg_string(cfg, 0));
if (obd == NULL) {
struct dt_update_request *opd_async_requests;
/* Protect current operations on opd_async_requests. */
struct mutex opd_async_requests_mutex;
- struct semaphore opd_async_fc_sem;
};
#define opd_pre_lock opd_pre->osp_pre_lock
struct osp_async_update_args {
struct dt_update_request *oaua_update;
- unsigned int oaua_fc:1;
+ bool oaua_flow_control;
};
struct osp_async_update_item {
struct dt_update_request *dt_update = oaua->oaua_update;
struct osp_async_update_item *oaui;
struct osp_async_update_item *next;
- struct osp_device *osp = dt2osp_dev(dt_update->dur_dt);
int count = 0;
int index = 0;
int rc1 = 0;
- if (oaua->oaua_fc)
- up(&osp->opd_async_fc_sem);
+ if (oaua->oaua_flow_control)
+ obd_put_request_slot(
+ &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
if (rc == 0 || req->rq_repmsg != NULL) {
reply = req_capsule_server_sized_get(&req->rq_pill,
static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
struct dt_update_request *dt_update,
- struct thandle *th, bool fc)
+ struct thandle *th, bool flow_control)
{
struct thandle_update *tu = th->th_update;
int rc = 0;
if (rc == 0) {
args = ptlrpc_req_async_args(req);
args->oaua_update = dt_update;
- args->oaua_fc = !!fc;
+ args->oaua_flow_control = flow_control;
req->rq_interpret_reply =
osp_async_update_interpret;
ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
if (is_only_remote_trans(th)) {
if (th->th_result == 0) {
struct osp_device *osp = dt2osp_dev(th->th_dev);
+ struct client_obd *cli = &osp->opd_obd->u.cli;
- do {
- if (!osp->opd_imp_active ||
- osp->opd_got_disconnected) {
- out_destroy_update_req(dt_update);
- GOTO(put, rc = -ENOTCONN);
- }
+ rc = obd_get_request_slot(cli);
+ if (!osp->opd_imp_active || osp->opd_got_disconnected) {
+ if (rc == 0)
+ obd_put_request_slot(cli);
- /* Get the semaphore to guarantee it has
- * free slot, which will be released via
- * osp_async_update_interpret(). */
- rc = down_timeout(&osp->opd_async_fc_sem, HZ);
- } while (rc != 0);
+ rc = -ENOTCONN;
+ }
+
+ if (rc != 0) {
+ out_destroy_update_req(dt_update);
+ goto put;
+ }
rc = osp_trans_trigger(env, dt2osp_dev(dt),
dt_update, th, true);
if (rc != 0)
- up(&osp->opd_async_fc_sem);
+ obd_put_request_slot(cli);
} else {
rc = th->th_result;
out_destroy_update_req(dt_update);
* with the default max_rpcs_in_flight value, as we are scheduling over
* NIDs, and there may be more than one mount point per client.
*/
- net->cn_quantum = OSC_MAX_RIF_DEFAULT;
+ net->cn_quantum = OBD_MAX_RIF_DEFAULT;
/**
* Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
* true.