From: Fan Yong Date: Tue, 25 Mar 2014 14:16:12 +0000 (+0800) Subject: LU-4687 obdclass: unified flow control interfaces X-Git-Tag: 2.5.59~101 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=47a6999b6a54218ba68ed5d716f85bcc9c91b9ef;p=fs%2Flustre-release.git LU-4687 obdclass: unified flow control interfaces Unify the flow control interfaces for MDC RPC, FLD RPC and lfsck remote update (via OSP). We allow to adjust the maximum inflight RPCs count via /proc interface. Rename some variables/functions to make them easily understood. Signed-off-by: Fan Yong Change-Id: I096a1de17fe226bac2fe3a3c891365c2e2c51ef3 Reviewed-on: http://review.whamcloud.com/9562 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Niu Yawei Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- diff --git a/lustre/fld/fld_request.c b/lustre/fld/fld_request.c index 2a2a927..6a4986e 100644 --- a/lustre/fld/fld_request.c +++ b/lustre/fld/fld_request.c @@ -60,57 +60,6 @@ #include #include "fld_internal.h" -/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c - * It should be common thing. The same about mdc RPC lock */ -static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw) -{ - int rc; - ENTRY; - client_obd_list_lock(&cli->cl_loi_list_lock); - rc = cfs_list_empty(&mcw->mcw_entry); - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); -}; - -static void fld_enter_request(struct client_obd *cli) -{ - struct mdc_cache_waiter mcw; - struct l_wait_info lwi = { 0 }; - - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); - init_waitqueue_head(&mcw.mcw_waitq); - client_obd_list_unlock(&cli->cl_loi_list_lock); - l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi); - } else { - cli->cl_r_in_flight++; - client_obd_list_unlock(&cli->cl_loi_list_lock); - } -} - -static void fld_exit_request(struct client_obd *cli) -{ - cfs_list_t *l, *tmp; - struct mdc_cache_waiter *mcw; - - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_r_in_flight--; - cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - /* No free request slots anymore */ - break; - } - - mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry); - cfs_list_del_init(&mcw->mcw_entry); - cli->cl_r_in_flight++; - wake_up(&mcw->mcw_waitq); - } - client_obd_list_unlock(&cli->cl_loi_list_lock); -} - static int fld_rrb_hash(struct lu_client_fld *fld, seqno_t seq) { @@ -477,9 +426,9 @@ again: req->rq_reply_portal = MDC_REPLY_PORTAL; ptlrpc_at_set_req_timeout(req); - fld_enter_request(&exp->exp_obd->u.cli); + obd_get_request_slot(&exp->exp_obd->u.cli); rc = ptlrpc_queue_wait(req); - fld_exit_request(&exp->exp_obd->u.cli); + obd_put_request_slot(&exp->exp_obd->u.cli); if (rc != 0) { if (rc == -EWOULDBLOCK) { /* For no_delay req(see above), EWOULDBLOCK means the diff --git a/lustre/include/lustre_mdc.h b/lustre/include/lustre_mdc.h index 3d343b1..9f6edd3 100644 --- a/lustre/include/lustre_mdc.h +++ b/lustre/include/lustre_mdc.h @@ -191,11 +191,6 @@ static inline void mdc_update_max_ea_from_body(struct obd_export *exp, } -struct mdc_cache_waiter { - cfs_list_t mcw_entry; - wait_queue_head_t mcw_waitq; -}; - /* mdc/mdc_locks.c */ int it_disposition(const struct lookup_intent *it, int flag); void it_clear_disposition(struct lookup_intent *it, int flag); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index ac5fa1d..d06bbfe 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -239,12 +239,12 @@ struct timeout_item { cfs_list_t ti_chain; }; -#define OSC_MAX_RIF_DEFAULT 8 -#define MDS_OSC_MAX_RIF_DEFAULT 50 -#define OSC_MAX_RIF_MAX 256 -#define OSC_MAX_DIRTY_DEFAULT (OSC_MAX_RIF_DEFAULT * 4) -#define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */ -#define OSC_DEFAULT_RESENDS 10 +#define OBD_MAX_RIF_DEFAULT 8 +#define OBD_MAX_RIF_MAX 512 +#define OSC_MAX_RIF_MAX 256 +#define OSC_MAX_DIRTY_DEFAULT (OBD_MAX_RIF_DEFAULT * 4) +#define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */ +#define OSC_DEFAULT_RESENDS 10 /* possible values for fo_sync_lock_cancel */ enum { @@ -254,9 +254,6 @@ enum { NUM_SYNC_ON_CANCEL_STATES }; -#define MDC_MAX_RIF_DEFAULT 8 -#define MDC_MAX_RIF_MAX 512 - struct mdc_rpc_lock; struct obd_import; struct client_obd { diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 5de7cd7..7464435 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -128,6 +128,10 @@ struct kuc_hdr * kuc_ptr(void *p); int kuc_ispayload(void *p); void *kuc_alloc(int payload_len, int transport, int type); void kuc_free(void *p, int payload_len); +int obd_get_request_slot(struct client_obd *cli); +void obd_put_request_slot(struct client_obd *cli); +__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli); +int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max); struct llog_handle; struct llog_rec_hdr; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index aac76e9..a3769d5 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -405,7 +405,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) cli->cl_chunkbits = PAGE_CACHE_SHIFT; if (!strcmp(name, LUSTRE_MDC_NAME)) { - cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT; + cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT; } else if (totalram_pages >> (20 - PAGE_CACHE_SHIFT) <= 128 /* MB */) { cli->cl_max_rpcs_in_flight = 2; } else if (totalram_pages >> (20 - PAGE_CACHE_SHIFT) <= 256 /* MB */) { @@ -414,9 +414,9 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) cli->cl_max_rpcs_in_flight = 4; } else { if (osc_on_mdt(obddev->obd_name)) - cli->cl_max_rpcs_in_flight = MDS_OSC_MAX_RIF_DEFAULT; + cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_MAX; else - cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; + cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT; } rc = ldlm_get_ref(); if (rc) { diff --git a/lustre/mdc/lproc_mdc.c b/lustre/mdc/lproc_mdc.c index 69e377f..3a4570a 100644 --- a/lustre/mdc/lproc_mdc.c +++ b/lustre/mdc/lproc_mdc.c @@ -45,12 +45,12 @@ static int mdc_max_rpcs_in_flight_seq_show(struct seq_file *m, void *v) { struct obd_device *dev = m->private; - struct client_obd *cli = &dev->u.cli; + __u32 max; int rc; - client_obd_list_lock(&cli->cl_loi_list_lock); - rc = seq_printf(m, "%u\n", cli->cl_max_rpcs_in_flight); - client_obd_list_unlock(&cli->cl_loi_list_lock); + max = obd_get_max_rpcs_in_flight(&dev->u.cli); + rc = seq_printf(m, "%u\n", max); + return rc; } @@ -60,21 +60,17 @@ static ssize_t mdc_max_rpcs_in_flight_seq_write(struct file *file, loff_t *off) { struct obd_device *dev = ((struct seq_file *)file->private_data)->private; - struct client_obd *cli = &dev->u.cli; - int val, rc; - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; + int val; + int rc; - if (val < 1 || val > MDC_MAX_RIF_MAX) - return -ERANGE; + rc = lprocfs_write_helper(buffer, count, &val); + if (rc == 0) + rc = obd_set_max_rpcs_in_flight(&dev->u.cli, val); - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_max_rpcs_in_flight = val; - client_obd_list_unlock(&cli->cl_loi_list_lock); + if (rc != 0) + count = rc; - return count; + return count; } LPROC_SEQ_FOPS(mdc_max_rpcs_in_flight); diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index a49e9cd..68c5cc6 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -72,8 +72,6 @@ void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data); void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data, const char *old, int oldlen, const char *new, int newlen); void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data); -int mdc_enter_request(struct client_obd *cli); -void mdc_exit_request(struct client_obd *cli); /* mdc/mdc_locks.c */ int mdc_set_lock_data(struct obd_export *exp, diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index 2d8a09d..1e4c3dc 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -546,65 +546,3 @@ void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data) mdc_ioepoch_pack(epoch, op_data); mdc_hsm_release_pack(req, op_data); } - -static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw) -{ - int rc; - ENTRY; - client_obd_list_lock(&cli->cl_loi_list_lock); - rc = cfs_list_empty(&mcw->mcw_entry); - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); -}; - -/* We record requests in flight in cli->cl_r_in_flight here. - * There is only one write rpc possible in mdc anyway. If this to change - * in the future - the code may need to be revisited. */ -int mdc_enter_request(struct client_obd *cli) -{ - int rc = 0; - struct mdc_cache_waiter mcw; - struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); - init_waitqueue_head(&mcw.mcw_waitq); - client_obd_list_unlock(&cli->cl_loi_list_lock); - rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw), &lwi); - if (rc) { - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cfs_list_empty(&mcw.mcw_entry)) - cli->cl_r_in_flight--; - cfs_list_del_init(&mcw.mcw_entry); - client_obd_list_unlock(&cli->cl_loi_list_lock); - } - } else { - cli->cl_r_in_flight++; - client_obd_list_unlock(&cli->cl_loi_list_lock); - } - return rc; -} - -void mdc_exit_request(struct client_obd *cli) -{ - cfs_list_t *l, *tmp; - struct mdc_cache_waiter *mcw; - - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_r_in_flight--; - cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - /* No free request slots anymore */ - break; - } - - mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry); - cfs_list_del_init(&mcw->mcw_entry); - cli->cl_r_in_flight++; - wake_up(&mcw->mcw_waitq); - } - /* Empty waiting list? Decrease reqs in-flight number */ - - client_obd_list_unlock(&cli->cl_loi_list_lock); -} diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index fa40867..ffc49c3 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -872,7 +872,7 @@ resend: * rpcs in flight counter. We do not do flock request limiting, though*/ if (it) { mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - rc = mdc_enter_request(&obddev->u.cli); + rc = obd_get_request_slot(&obddev->u.cli); if (rc != 0) { mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); mdc_clear_replay_flag(req, 0); @@ -899,7 +899,7 @@ resend: RETURN(rc); } - mdc_exit_request(&obddev->u.cli); + obd_put_request_slot(&obddev->u.cli); mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); if (rc < 0) { @@ -1228,7 +1228,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, obddev = class_exp2obd(exp); - mdc_exit_request(&obddev->u.cli); + obd_put_request_slot(&obddev->u.cli); if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) rc = -ETIMEDOUT; @@ -1290,7 +1290,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, if (IS_ERR(req)) RETURN(PTR_ERR(req)); - rc = mdc_enter_request(&obddev->u.cli); + rc = obd_get_request_slot(&obddev->u.cli); if (rc != 0) { ptlrpc_req_finished(req); RETURN(rc); @@ -1299,7 +1299,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) { - mdc_exit_request(&obddev->u.cli); + obd_put_request_slot(&obddev->u.cli); ptlrpc_req_finished(req); RETURN(rc); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index d5c9701..846b659 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -95,15 +95,15 @@ static inline int mdc_queue_wait(struct ptlrpc_request *req) struct client_obd *cli = &req->rq_import->imp_obd->u.cli; int rc; - /* mdc_enter_request() ensures that this client has no more + /* obd_get_request_slot() ensures that this client has no more * than cl_max_rpcs_in_flight RPCs simultaneously inf light * against an MDT. */ - rc = mdc_enter_request(cli); + rc = obd_get_request_slot(cli); if (rc != 0) return rc; rc = ptlrpc_queue_wait(req); - mdc_exit_request(cli); + obd_put_request_slot(cli); return rc; } diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 98dbcce..d7c541a 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -1934,5 +1934,132 @@ inline void kuc_free(void *p, int payload_len) } EXPORT_SYMBOL(kuc_free); +struct obd_request_slot_waiter { + struct list_head orsw_entry; + wait_queue_head_t orsw_waitq; + bool orsw_signaled; +}; + +static bool obd_request_slot_avail(struct client_obd *cli, + struct obd_request_slot_waiter *orsw) +{ + bool avail; + + client_obd_list_lock(&cli->cl_loi_list_lock); + avail = !!list_empty(&orsw->orsw_entry); + client_obd_list_unlock(&cli->cl_loi_list_lock); + + return avail; +}; +/* + * For network flow control, the RPC sponsor needs to acquire a credit + * before sending the RPC. The credits count for a connection is defined + * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then + * the subsequent RPC sponsors need to wait until others released their + * credits, or the administrator increased the "cl_max_rpcs_in_flight". + */ +int obd_get_request_slot(struct client_obd *cli) +{ + struct obd_request_slot_waiter orsw; + struct l_wait_info lwi; + int rc; + client_obd_list_lock(&cli->cl_loi_list_lock); + if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) { + cli->cl_r_in_flight++; + client_obd_list_unlock(&cli->cl_loi_list_lock); + return 0; + } + + init_waitqueue_head(&orsw.orsw_waitq); + list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list); + orsw.orsw_signaled = false; + client_obd_list_unlock(&cli->cl_loi_list_lock); + + lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + rc = l_wait_event(orsw.orsw_waitq, + obd_request_slot_avail(cli, &orsw) || + orsw.orsw_signaled, + &lwi); + + /* Here, we must take the lock to avoid the on-stack 'orsw' to be + * freed but other (such as obd_put_request_slot) is using it. */ + client_obd_list_lock(&cli->cl_loi_list_lock); + if (rc != 0) { + if (!orsw.orsw_signaled) { + if (list_empty(&orsw.orsw_entry)) + cli->cl_r_in_flight--; + else + list_del(&orsw.orsw_entry); + } + } + + if (orsw.orsw_signaled) { + LASSERT(list_empty(&orsw.orsw_entry)); + + rc = -EINTR; + } + client_obd_list_unlock(&cli->cl_loi_list_lock); + + return rc; +} +EXPORT_SYMBOL(obd_get_request_slot); + +void obd_put_request_slot(struct client_obd *cli) +{ + struct obd_request_slot_waiter *orsw; + + client_obd_list_lock(&cli->cl_loi_list_lock); + cli->cl_r_in_flight--; + + /* If there is free slot, wakeup the first waiter. */ + if (!list_empty(&cli->cl_loi_read_list) && + likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) { + orsw = list_entry(cli->cl_loi_read_list.next, + struct obd_request_slot_waiter, orsw_entry); + list_del_init(&orsw->orsw_entry); + cli->cl_r_in_flight++; + wake_up(&orsw->orsw_waitq); + } + client_obd_list_unlock(&cli->cl_loi_list_lock); +} +EXPORT_SYMBOL(obd_put_request_slot); + +__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli) +{ + return cli->cl_max_rpcs_in_flight; +} +EXPORT_SYMBOL(obd_get_max_rpcs_in_flight); + +int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max) +{ + struct obd_request_slot_waiter *orsw; + __u32 old; + int diff; + int i; + + if (max > OBD_MAX_RIF_MAX || max < 1) + return -ERANGE; + + client_obd_list_lock(&cli->cl_loi_list_lock); + old = cli->cl_max_rpcs_in_flight; + cli->cl_max_rpcs_in_flight = max; + diff = max - old; + + /* We increase the max_rpcs_in_flight, then wakeup some waiters. */ + for (i = 0; i < diff; i++) { + if (list_empty(&cli->cl_loi_read_list)) + break; + + orsw = list_entry(cli->cl_loi_read_list.next, + struct obd_request_slot_waiter, orsw_entry); + list_del_init(&orsw->orsw_entry); + cli->cl_r_in_flight++; + wake_up(&orsw->orsw_waitq); + } + client_obd_list_unlock(&cli->cl_loi_list_lock); + + return 0; +} +EXPORT_SYMBOL(obd_set_max_rpcs_in_flight); diff --git a/lustre/osp/lproc_osp.c b/lustre/osp/lproc_osp.c index 75ebeb1..118c4df 100644 --- a/lustre/osp/lproc_osp.c +++ b/lustre/osp/lproc_osp.c @@ -429,6 +429,38 @@ static int osp_rd_old_sync_processed(char *page, char **start, off_t off, return rc; } +static int osp_rd_lfsck_max_rpcs_in_flight(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *dev = data; + __u32 max; + int rc; + + *eof = 1; + max = obd_get_max_rpcs_in_flight(&dev->u.cli); + rc = snprintf(page, count, "%u\n", max); + + return rc; +} + +static int osp_wr_lfsck_max_rpcs_in_flight(struct file *file, + const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *dev = data; + int val; + int rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc == 0) + rc = obd_set_max_rpcs_in_flight(&dev->u.cli, val); + + if (rc != 0) + count = rc; + + return count; +} + static struct lprocfs_vars lprocfs_osp_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "ping", 0, lprocfs_wr_ping, 0, 0, 0222 }, @@ -461,6 +493,8 @@ static struct lprocfs_vars lprocfs_osp_obd_vars[] = { /* for compatibility reasons */ { "destroys_in_flight", osp_rd_destroys_in_flight, 0, 0 }, + { "lfsck_max_rpcs_in_flight", osp_rd_lfsck_max_rpcs_in_flight, + osp_wr_lfsck_max_rpcs_in_flight, 0 }, { 0 } }; diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 60bf8ef..ff0b794 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -531,8 +531,6 @@ out: RETURN(rc); } -#define OSP_MAX_AUIF_MAX 512 - static int osp_init0(const struct lu_env *env, struct osp_device *m, struct lu_device_type *ldt, struct lustre_cfg *cfg) { @@ -546,8 +544,6 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m, ENTRY; mutex_init(&m->opd_async_requests_mutex); - /* We allow OSP_MAX_AUIF_MAX async updates in flight at most. */ - sema_init(&m->opd_async_fc_sem, OSP_MAX_AUIF_MAX); obd = class_name2obd(lustre_cfg_string(cfg, 0)); if (obd == NULL) { diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index f2f2a2c..c90e029 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -195,7 +195,6 @@ struct osp_device { struct dt_update_request *opd_async_requests; /* Protect current operations on opd_async_requests. */ struct mutex opd_async_requests_mutex; - struct semaphore opd_async_fc_sem; }; #define opd_pre_lock opd_pre->osp_pre_lock diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index fd137d1..8e77ecf 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -35,7 +35,7 @@ struct osp_async_update_args { struct dt_update_request *oaua_update; - unsigned int oaua_fc:1; + bool oaua_flow_control; }; struct osp_async_update_item { @@ -82,13 +82,13 @@ static int osp_async_update_interpret(const struct lu_env *env, struct dt_update_request *dt_update = oaua->oaua_update; struct osp_async_update_item *oaui; struct osp_async_update_item *next; - struct osp_device *osp = dt2osp_dev(dt_update->dur_dt); int count = 0; int index = 0; int rc1 = 0; - if (oaua->oaua_fc) - up(&osp->opd_async_fc_sem); + if (oaua->oaua_flow_control) + obd_put_request_slot( + &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli); if (rc == 0 || req->rq_repmsg != NULL) { reply = req_capsule_server_sized_get(&req->rq_pill, @@ -269,7 +269,7 @@ out: static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *dt_update, - struct thandle *th, bool fc) + struct thandle *th, bool flow_control) { struct thandle_update *tu = th->th_update; int rc = 0; @@ -288,7 +288,7 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, if (rc == 0) { args = ptlrpc_req_async_args(req); args->oaua_update = dt_update; - args->oaua_fc = !!fc; + args->oaua_flow_control = flow_control; req->rq_interpret_reply = osp_async_update_interpret; ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); @@ -366,24 +366,25 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, if (is_only_remote_trans(th)) { if (th->th_result == 0) { struct osp_device *osp = dt2osp_dev(th->th_dev); + struct client_obd *cli = &osp->opd_obd->u.cli; - do { - if (!osp->opd_imp_active || - osp->opd_got_disconnected) { - out_destroy_update_req(dt_update); - GOTO(put, rc = -ENOTCONN); - } + rc = obd_get_request_slot(cli); + if (!osp->opd_imp_active || osp->opd_got_disconnected) { + if (rc == 0) + obd_put_request_slot(cli); - /* Get the semaphore to guarantee it has - * free slot, which will be released via - * osp_async_update_interpret(). */ - rc = down_timeout(&osp->opd_async_fc_sem, HZ); - } while (rc != 0); + rc = -ENOTCONN; + } + + if (rc != 0) { + out_destroy_update_req(dt_update); + goto put; + } rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th, true); if (rc != 0) - up(&osp->opd_async_fc_sem); + obd_put_request_slot(cli); } else { rc = th->th_result; out_destroy_update_req(dt_update); diff --git a/lustre/ptlrpc/nrs_crr.c b/lustre/ptlrpc/nrs_crr.c index 53a3fbc..e0f1e8d 100644 --- a/lustre/ptlrpc/nrs_crr.c +++ b/lustre/ptlrpc/nrs_crr.c @@ -214,7 +214,7 @@ static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg) * with the default max_rpcs_in_flight value, as we are scheduling over * NIDs, and there may be more than one mount point per client. */ - net->cn_quantum = OSC_MAX_RIF_DEFAULT; + net->cn_quantum = OBD_MAX_RIF_DEFAULT; /** * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to * true.