From: Jinshan Xiong Date: Wed, 4 Jan 2012 07:56:22 +0000 (-0800) Subject: LU-962 ptlrpc: feature to run callback in ptlrpcd context X-Git-Tag: 2.1.54~6 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=2064c2a7e616b172f72462884b23d899bfc040ff LU-962 ptlrpc: feature to run callback in ptlrpcd context In this patch, a feature is added to run a callback in ptlrpc context. We need a ptlrpc work for this purpose. There are three functions exported: 1. ptlrpc_alloc_work() to allocate work; 2. ptlrpc_run_work() to run an allocated work, this function can be executed many times; 3. ptlrpc_destroy_work() to destroy the work; Signed-off-by: Jinshan Xiong Change-Id: I2bce5a17003855468eab9075fb50ed02d7bcc208 Reviewed-on: http://review.whamcloud.com/1917 Reviewed-by: Johann Lombardi Tested-by: Hudson Tested-by: Maloo Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index c065b86..67ec415 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -1527,6 +1527,12 @@ __u64 ptlrpc_next_xid(void); __u64 ptlrpc_sample_next_xid(void); __u64 ptlrpc_req_xid(struct ptlrpc_request *request); +/* Set of routines to run a function in ptlrpcd context */ +void *ptlrpcd_alloc_work(struct obd_import *imp, + int (*cb)(const struct lu_env *, void *), void *data); +void ptlrpcd_destroy_work(void *handler); +int ptlrpcd_queue_work(void *handler); + /** @} */ struct ptlrpc_service_conf { diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index e9979df..de8af5b 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -2115,7 +2115,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) LASSERTF(cfs_list_empty(&request->rq_set_chain), "req %p\n", request); LASSERTF(cfs_list_empty(&request->rq_exp_list), "req %p\n", request); LASSERTF(!request->rq_replay, "req %p\n", request); - LASSERT(request->rq_cli_ctx || request->rq_fake); req_capsule_fini(&request->rq_pill); @@ -2809,3 +2808,136 @@ __u64 ptlrpc_sample_next_xid(void) #endif } EXPORT_SYMBOL(ptlrpc_sample_next_xid); + +/** + * Functions for operating ptlrpc workers. + * + * A ptlrpc work is a function which will be running inside ptlrpc context. + * The callback shouldn't sleep otherwise it will block that ptlrpcd thread. + * + * 1. after a work is created, it can be used many times, that is: + * handler = ptlrpcd_alloc_work(); + * ptlrpcd_queue_work(); + * + * queue it again when necessary: + * ptlrpcd_queue_work(); + * ptlrpcd_destroy_work(); + * 2. ptlrpcd_queue_work() can be called by multiple processes meanwhile, but + * it will only be queued once in any time. Also as its name implies, it may + * have delay before it really runs by ptlrpcd thread. + */ +struct ptlrpc_work_async_args { + __u64 magic; + int (*cb)(const struct lu_env *, void *); + void *cbdata; +}; + +#define PTLRPC_WORK_MAGIC 0x6655436b676f4f44ULL /* magic code */ + +static int work_interpreter(const struct lu_env *env, + struct ptlrpc_request *req, void *data, int rc) +{ + struct ptlrpc_work_async_args *arg = data; + + LASSERT(arg->magic == PTLRPC_WORK_MAGIC); + LASSERT(arg->cb != NULL); + + return arg->cb(env, arg->cbdata); +} + +/** + * Create a work for ptlrpc. + */ +void *ptlrpcd_alloc_work(struct obd_import *imp, + int (*cb)(const struct lu_env *, void *), void *cbdata) +{ + struct ptlrpc_request *req = NULL; + struct ptlrpc_work_async_args *args; + ENTRY; + + cfs_might_sleep(); + + if (cb == NULL) + RETURN(ERR_PTR(-EINVAL)); + + /* copy some code from deprecated fakereq. */ + OBD_ALLOC_PTR(req); + if (req == NULL) { + CERROR("ptlrpc: run out of memory!\n"); + RETURN(ERR_PTR(-ENOMEM)); + } + + req->rq_send_state = LUSTRE_IMP_FULL; + req->rq_type = PTL_RPC_MSG_REQUEST; + req->rq_import = class_import_get(imp); + req->rq_export = NULL; + req->rq_interpret_reply = work_interpreter; + /* don't want reply */ + req->rq_receiving_reply = 0; + req->rq_must_unlink = 0; + req->rq_no_delay = req->rq_no_resend = 1; + + cfs_spin_lock_init(&req->rq_lock); + CFS_INIT_LIST_HEAD(&req->rq_list); + CFS_INIT_LIST_HEAD(&req->rq_replay_list); + CFS_INIT_LIST_HEAD(&req->rq_set_chain); + CFS_INIT_LIST_HEAD(&req->rq_history_list); + CFS_INIT_LIST_HEAD(&req->rq_exp_list); + cfs_waitq_init(&req->rq_reply_waitq); + cfs_waitq_init(&req->rq_set_waitq); + cfs_atomic_set(&req->rq_refcount, 1); + + CLASSERT (sizeof(*args) <= sizeof(req->rq_async_args)); + args = ptlrpc_req_async_args(req); + args->magic = PTLRPC_WORK_MAGIC; + args->cb = cb; + args->cbdata = cbdata; + + RETURN(req); +} +EXPORT_SYMBOL(ptlrpcd_alloc_work); + +void ptlrpcd_destroy_work(void *handler) +{ + struct ptlrpc_request *req = handler; + + if (req) + ptlrpc_req_finished(req); +} +EXPORT_SYMBOL(ptlrpcd_destroy_work); + +int ptlrpcd_queue_work(void *handler) +{ + struct ptlrpc_request *req = handler; + + /* + * Check if the req is already being queued. + * + * Here comes a trick: it lacks a way of checking if a req is being + * processed reliably in ptlrpc. Here I have to use refcount of req + * for this purpose. This is okay because the caller should use this + * req as opaque data. - Jinshan + */ + LASSERT(cfs_atomic_read(&req->rq_refcount) > 0); + if (cfs_atomic_read(&req->rq_refcount) > 1) + return -EBUSY; + + if (cfs_atomic_inc_return(&req->rq_refcount) > 2) { /* race */ + cfs_atomic_dec(&req->rq_refcount); + return -EBUSY; + } + + /* re-initialize the req */ + req->rq_timeout = obd_timeout; + req->rq_sent = cfs_time_current_sec(); + req->rq_deadline = req->rq_sent + req->rq_timeout; + req->rq_reply_deadline = req->rq_deadline; + req->rq_phase = RQ_PHASE_INTERPRET; + req->rq_next_phase = RQ_PHASE_COMPLETE; + req->rq_xid = ptlrpc_next_xid(); + req->rq_import_generation = req->rq_import->imp_generation; + + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + return 0; +} +EXPORT_SYMBOL(ptlrpcd_queue_work); diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 0616c04a..69bd210 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -281,12 +281,13 @@ static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set) * Check if there is more work to do on ptlrpcd set. * Returns 1 if yes. */ -static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc) +static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) { cfs_list_t *tmp, *pos; struct ptlrpc_request *req; struct ptlrpc_request_set *set = pc->pc_set; int rc = 0; + int rc2; ENTRY; if (cfs_atomic_read(&set->set_new_count)) { @@ -305,6 +306,25 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc) cfs_spin_unlock(&set->set_new_req_lock); } + /* We should call lu_env_refill() before handling new requests to make + * sure that env key the requests depending on really exists. + */ + rc2 = lu_env_refill(env); + if (rc2 != 0) { + /* + * XXX This is very awkward situation, because + * execution can neither continue (request + * interpreters assume that env is set up), nor repeat + * the loop (as this potentially results in a tight + * loop of -ENOMEM's). + * + * Fortunately, refill only ever does something when + * new modules are loaded, i.e., early during boot up. + */ + CERROR("Failure to refill session: %d\n", rc2); + RETURN(rc); + } + if (cfs_atomic_read(&set->set_remaining)) rc |= ptlrpc_check_set(env, set); @@ -425,22 +445,6 @@ static int ptlrpcd(void *arg) struct l_wait_info lwi; int timeout; - rc = lu_env_refill(&env); - if (rc != 0) { - /* - * XXX This is very awkward situation, because - * execution can neither continue (request - * interpreters assume that env is set up), nor repeat - * the loop (as this potentially results in a tight - * loop of -ENOMEM's). - * - * Fortunately, refill only ever does something when - * new modules are loaded, i.e., early during boot up. - */ - CERROR("Failure to refill session: %d\n", rc); - continue; - } - timeout = ptlrpc_set_next_timeout(set); lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), ptlrpc_expired_set, set);