From aa82cc83612dbd4c7d05fc101b98e8660f1373db Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Wed, 3 Apr 2019 11:29:06 +0300 Subject: [PATCH] LU-12034 obdclass: put all service's env on the list to be able to lookup by current thread where it's too complicated to pass env by argument. this version has stats to see slow/fast lookups. so, in sanity-benchmark there were 172850 fast lookups (from per-cpu cache) and 27228 slow lookups (from rhashtable). going to see the ration in autotest's reports. Fixes: 2339e1b3b690 ("LU-11483 ldlm ofd_lvbo_init() and mdt_lvbo_fill() create env") Fixes: e02cb40761ff ("LU-11164 ldlm: pass env to lvbo methods") Change-Id: Ia760e10fa5c68e7a18284e4726d215b330fc0eed Signed-off-by: Alex Zhuravlev Reviewed-on: https://review.whamcloud.com/34566 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andrew Perepechko Reviewed-by: Andreas Dilger --- lustre/include/lu_object.h | 4 ++ lustre/include/lustre_dlm.h | 34 ++++++--------- lustre/include/obd_class.h | 19 +++++--- lustre/ldlm/ldlm_lib.c | 24 +++++----- lustre/ldlm/ldlm_lock.c | 2 +- lustre/ldlm/ldlm_lockd.c | 54 +++++++++++++---------- lustre/ldlm/ldlm_request.c | 2 +- lustre/mdt/mdt_internal.h | 5 +-- lustre/mdt/mdt_io.c | 4 +- lustre/mdt/mdt_lvb.c | 41 +++++++----------- lustre/obdclass/lu_object.c | 101 ++++++++++++++++++++++++++++++++++++++++++- lustre/obdecho/echo_client.c | 5 +++ lustre/ofd/ofd_dev.c | 4 +- lustre/ofd/ofd_io.c | 2 +- lustre/ofd/ofd_lvb.c | 34 ++++++--------- lustre/ofd/ofd_obd.c | 2 +- lustre/ptlrpc/client.c | 20 +-------- lustre/ptlrpc/service.c | 38 ++++++++-------- lustre/quota/qmt_lock.c | 89 +++++++++----------------------------- 19 files changed, 259 insertions(+), 225 deletions(-) diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 0451563..3c04907 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -1250,6 +1250,10 @@ void lu_env_fini (struct lu_env *env); int lu_env_refill(struct lu_env *env); int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags, __u32 stags); +struct lu_env *lu_env_find(void); +int lu_env_add(struct lu_env *env); +void lu_env_remove(struct lu_env *env); + /** @} lu_context */ /** diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 8aa4a76..0a6c998 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -293,16 +293,14 @@ typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock); * of ldlm_[res_]lvbo_[init,update,fill]() functions. */ struct ldlm_valblock_ops { - int (*lvbo_init)(const struct lu_env *env, struct ldlm_resource *res); - int (*lvbo_update)(const struct lu_env *env, struct ldlm_resource *res, - struct ldlm_lock *lock, struct ptlrpc_request *r, - int increase); + int (*lvbo_init)(struct ldlm_resource *res); + int (*lvbo_update)(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *r, int increase); int (*lvbo_free)(struct ldlm_resource *res); /* Return size of lvb data appropriate RPC size can be reserved */ int (*lvbo_size)(struct ldlm_lock *lock); /* Called to fill in lvb data to RPC buffer @buf */ - int (*lvbo_fill)(const struct lu_env *env, struct ldlm_lock *lock, - void *buf, int *buflen); + int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int *buflen); }; /** @@ -1089,8 +1087,7 @@ ldlm_lock_to_ns_at(struct ldlm_lock *lock) return &lock->l_resource->lr_ns_bucket->nsb_at_estimate; } -static inline int ldlm_lvbo_init(const struct lu_env *env, - struct ldlm_resource *res) +static inline int ldlm_lvbo_init(struct ldlm_resource *res) { struct ldlm_namespace *ns = ldlm_res_to_ns(res); int rc = 0; @@ -1105,7 +1102,7 @@ static inline int ldlm_lvbo_init(const struct lu_env *env, mutex_unlock(&res->lr_lvb_mutex); return 0; } - rc = ns->ns_lvbo->lvbo_init(env, res); + rc = ns->ns_lvbo->lvbo_init(res); if (rc < 0) { CDEBUG(D_DLMTRACE, "lvbo_init failed for resource : rc = %d\n", rc); @@ -1131,8 +1128,7 @@ static inline int ldlm_lvbo_size(struct ldlm_lock *lock) return 0; } -static inline int ldlm_lvbo_fill(const struct lu_env *env, - struct ldlm_lock *lock, void *buf, int *len) +static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int *len) { struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); int rc; @@ -1140,13 +1136,13 @@ static inline int ldlm_lvbo_fill(const struct lu_env *env, if (ns->ns_lvbo != NULL) { LASSERT(ns->ns_lvbo->lvbo_fill != NULL); /* init lvb now if not already */ - rc = ldlm_lvbo_init(env, lock->l_resource); + rc = ldlm_lvbo_init(lock->l_resource); if (rc < 0) { CERROR("lock %p: delayed lvb init failed (rc %d)", lock, rc); return rc; } - return ns->ns_lvbo->lvbo_fill(env, lock, buf, len); + return ns->ns_lvbo->lvbo_fill(lock, buf, len); } return 0; } @@ -1408,8 +1404,7 @@ ldlm_handle2lock_long(const struct lustre_handle *h, __u64 flags) * Update Lock Value Block Operations (LVBO) on a resource taking into account * data from request \a r */ -static inline int ldlm_lvbo_update(const struct lu_env *env, - struct ldlm_resource *res, +static inline int ldlm_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, struct ptlrpc_request *req, int increase) { @@ -1417,24 +1412,23 @@ static inline int ldlm_lvbo_update(const struct lu_env *env, int rc; /* delayed lvb init may be required */ - rc = ldlm_lvbo_init(env, res); + rc = ldlm_lvbo_init(res); if (rc < 0) { CERROR("delayed lvb init failed (rc %d)\n", rc); return rc; } if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update) - return ns->ns_lvbo->lvbo_update(env, res, lock, req, increase); + return ns->ns_lvbo->lvbo_update(res, lock, req, increase); return 0; } -static inline int ldlm_res_lvbo_update(const struct lu_env *env, - struct ldlm_resource *res, +static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, struct ptlrpc_request *req, int increase) { - return ldlm_lvbo_update(env, res, NULL, req, increase); + return ldlm_lvbo_update(res, NULL, req, increase); } int ldlm_error2errno(enum ldlm_error error); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 359e7ef..a4e64b2b8 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -576,12 +576,19 @@ static inline int obd_precleanup(struct obd_device *obd) ENTRY; if (ldt != NULL && d != NULL) { - struct lu_env env; - - rc = lu_env_init(&env, ldt->ldt_ctx_tags); - if (rc == 0) { - ldt->ldt_ops->ldto_device_fini(&env, d); - lu_env_fini(&env); + struct lu_env *env = lu_env_find(); + struct lu_env _env; + + if (!env) { + env = &_env; + rc = lu_env_init(env, ldt->ldt_ctx_tags); + LASSERT(rc == 0); + lu_env_add(env); + } + ldt->ldt_ops->ldto_device_fini(env, d); + if (env == &_env) { + lu_env_remove(env); + lu_env_fini(env); } } diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index f54fb6a..09f39af 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -2470,18 +2470,16 @@ static int target_recovery_thread(void *arg) if (thread == NULL) RETURN(-ENOMEM); - OBD_ALLOC_PTR(env); - if (env == NULL) { - OBD_FREE_PTR(thread); - RETURN(-ENOMEM); - } + OBD_ALLOC_PTR(env); + if (env == NULL) + GOTO(out_thread, rc = -ENOMEM); + rc = lu_env_add(env); + if (rc) + GOTO(out_env, rc); rc = lu_context_init(&env->le_ctx, LCT_MD_THREAD | LCT_DT_THREAD); - if (rc) { - OBD_FREE_PTR(thread); - OBD_FREE_PTR(env); - RETURN(rc); - } + if (rc) + GOTO(out_env_remove, rc); thread->t_env = env; thread->t_id = -1; /* force filter_iobuf_get/put to use local buffers */ @@ -2572,8 +2570,12 @@ static int target_recovery_thread(void *arg) complete(&trd->trd_finishing); tgt_io_thread_done(thread); - OBD_FREE_PTR(thread); +out_env_remove: + lu_env_remove(env); +out_env: OBD_FREE_PTR(env); +out_thread: + OBD_FREE_PTR(thread); RETURN(rc); } diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 748c01e..3d587a5 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -2493,7 +2493,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp, res = ldlm_resource_getref(lock->l_resource); - ldlm_lvbo_update(ecl->ecl_env, res, lock, NULL, 1); + ldlm_lvbo_update(res, lock, NULL, 1); ldlm_lock_cancel(lock); if (!exp->exp_obd->obd_stopping) ldlm_reprocess_all(res); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index d3737cb..fe9668b 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -663,8 +663,7 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, /** * Perform lock cleanup if AST reply came with error. */ -static int ldlm_handle_ast_error(const struct lu_env *env, - struct ldlm_lock *lock, +static int ldlm_handle_ast_error(struct ldlm_lock *lock, struct ptlrpc_request *req, int rc, const char *ast_type) { @@ -726,7 +725,7 @@ static int ldlm_handle_ast_error(const struct lu_env *env, * see b=23174 */ ldlm_resource_getref(res); - ldlm_lvbo_update(env, res, lock, NULL, 1); + ldlm_lvbo_update(res, lock, NULL, 1); ldlm_resource_putref(res); } ldlm_lock_cancel(lock); @@ -759,28 +758,25 @@ static int ldlm_cb_interpret(const struct lu_env *env, * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274 */ if (unlikely(arg->gl_interpret_reply)) { - rc = arg->gl_interpret_reply(env, req, args, rc); + rc = arg->gl_interpret_reply(NULL, req, args, rc); } else if (rc == -ELDLM_NO_LOCK_DATA) { LDLM_DEBUG(lock, "lost race - client has a lock but no inode"); - ldlm_lvbo_update(env, lock->l_resource, lock, NULL, 1); + ldlm_lvbo_update(lock->l_resource, lock, NULL, 1); } else if (rc != 0) { - rc = ldlm_handle_ast_error(env, lock, req, - rc, "glimpse"); + rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); } else { - rc = ldlm_lvbo_update(env, lock->l_resource, + rc = ldlm_lvbo_update(lock->l_resource, lock, req, 1); } break; case LDLM_BL_CALLBACK: if (rc != 0) - rc = ldlm_handle_ast_error(env, lock, req, - rc, "blocking"); + rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); break; case LDLM_CP_CALLBACK: if (rc != 0) - rc = ldlm_handle_ast_error(env, lock, req, - rc, "completion"); + rc = ldlm_handle_ast_error(lock, req, rc, "completion"); break; default: LDLM_ERROR(lock, "invalid opcode for lock callback %d", @@ -1037,12 +1033,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) ldlm_lock2desc(lock, &body->lock_desc); if (lvb_len > 0) { void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); - const struct lu_env *env = NULL; - - if (req->rq_svc_thread) - env = req->rq_svc_thread->t_env; - - lvb_len = ldlm_lvbo_fill(env, lock, lvb, &lvb_len); + lvb_len = ldlm_lvbo_fill(lock, lvb, &lvb_len); if (lvb_len < 0) { /* * We still need to send the RPC to wake up the blocked @@ -1322,7 +1313,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, res = lock->l_resource; if (!(flags & LDLM_FL_REPLAY)) { /* non-replayed lock, delayed lvb init may need to be done */ - rc = ldlm_lvbo_init(env, res); + rc = ldlm_lvbo_init(res); if (rc < 0) { LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc); GOTO(out, rc); @@ -1500,7 +1491,7 @@ retry: if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) { int rc2; - rc2 = ldlm_lvbo_fill(env, lock, buf, &buflen); + rc2 = ldlm_lvbo_fill(lock, buf, &buflen); if (rc2 >= 0) { req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, @@ -1682,7 +1673,6 @@ int ldlm_request_cancel(struct ptlrpc_request *req, const struct ldlm_request *dlm_req, int first, enum lustre_at_flags flags) { - const struct lu_env *env = req->rq_svc_thread->t_env; struct ldlm_resource *res, *pres = NULL; struct ldlm_lock *lock; int i, count, done = 0; @@ -1733,7 +1723,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, LDLM_RESOURCE_ADDREF(res); if (!ldlm_is_discard_data(lock)) - ldlm_lvbo_update(env, res, lock, + ldlm_lvbo_update(res, lock, NULL, 1); } pres = res; @@ -2826,11 +2816,23 @@ static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp, */ static int ldlm_bl_thread_main(void *arg) { + struct lu_env *env; struct ldlm_bl_pool *blp; struct ldlm_bl_thread_data *bltd = arg; + int rc; ENTRY; + OBD_ALLOC_PTR(env); + if (!env) + RETURN(-ENOMEM); + rc = lu_env_init(env, LCT_DT_THREAD); + if (rc) + GOTO(out_env, rc); + rc = lu_env_add(env); + if (rc) + GOTO(out_env_fini, rc); + blp = bltd->bltd_blp; complete(&bltd->bltd_comp); @@ -2875,7 +2877,13 @@ static int ldlm_bl_thread_main(void *arg) atomic_dec(&blp->blp_num_threads); complete(&blp->blp_comp); - RETURN(0); + + lu_env_remove(env); +out_env_fini: + lu_env_fini(env); +out_env: + OBD_FREE_PTR(env); + RETURN(rc); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index ff78b1c..bfbf599 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -469,7 +469,7 @@ int ldlm_cli_enqueue_local(const struct lu_env *env, if (IS_ERR(lock)) GOTO(out_nolock, err = PTR_ERR(lock)); - err = ldlm_lvbo_init(env, lock->l_resource); + err = ldlm_lvbo_init(lock->l_resource); if (err < 0) { LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err); ldlm_lock_destroy_nolock(lock); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index b0f4430..7982041 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -1203,9 +1203,8 @@ int mdt_lsom_update(struct mdt_thread_info *info, struct mdt_object *obj, /* mdt_lvb.c */ extern struct ldlm_valblock_ops mdt_lvbo; int mdt_dom_lvb_is_valid(struct ldlm_resource *res); -int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, - struct ldlm_lock *lock, struct ptlrpc_request *req, - bool increase_only); +int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *req, bool increase_only); void mdt_enable_cos(struct mdt_device *dev, bool enable); int mdt_cos_is_enabled(struct mdt_device *); diff --git a/lustre/mdt/mdt_io.c b/lustre/mdt/mdt_io.c index 633c36d..1a8f019 100644 --- a/lustre/mdt/mdt_io.c +++ b/lustre/mdt/mdt_io.c @@ -1041,7 +1041,7 @@ int mdt_dom_object_size(const struct lu_env *env, struct mdt_device *mdt, /* Update lvbo data if DoM lock returned or if LVB is not yet valid. */ if (dom_lock || !mdt_dom_lvb_is_valid(res)) - mdt_dom_lvbo_update(env, res, NULL, NULL, false); + mdt_dom_lvbo_update(res, NULL, NULL, false); mdt_lvb2body(res, mb); ldlm_resource_putref(res); @@ -1139,7 +1139,7 @@ int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns, fill_mbo: /* LVB can be without valid data in case of DOM */ if (!mdt_dom_lvb_is_valid(res)) - mdt_dom_lvbo_update(mti->mti_env, res, lock, NULL, false); + mdt_dom_lvbo_update(res, lock, NULL, false); mdt_lvb2body(res, mbo); RETURN(rc); } diff --git a/lustre/mdt/mdt_lvb.c b/lustre/mdt/mdt_lvb.c index d4e11ca..5c60471 100644 --- a/lustre/mdt/mdt_lvb.c +++ b/lustre/mdt/mdt_lvb.c @@ -34,7 +34,7 @@ #include "mdt_internal.h" /* Called with res->lr_lvb_sem held */ -static int mdt_lvbo_init(const struct lu_env *env, struct ldlm_resource *res) +static int mdt_lvbo_init(struct ldlm_resource *res) { if (IS_LQUOTA_RES(res)) { struct mdt_device *mdt; @@ -140,11 +140,11 @@ int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo, RETURN(rc); } -int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, - struct ldlm_lock *lock, struct ptlrpc_request *req, - bool increase_only) +int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *req, bool increase_only) { struct obd_export *exp = lock ? lock->l_export : NULL; + const struct lu_env *env = lu_env_find(); struct mdt_device *mdt; struct mdt_object *mo; struct mdt_thread_info *info; @@ -251,9 +251,8 @@ out_env: return rc; } -static int mdt_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, - struct ldlm_lock *lock, struct ptlrpc_request *req, - int increase_only) +static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *req, int increase_only) { ENTRY; @@ -275,8 +274,7 @@ static int mdt_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, * by MDT for DOM objects only. */ if (lock == NULL || ldlm_has_dom(lock)) - return mdt_dom_lvbo_update(env, res, lock, req, - !!increase_only); + return mdt_dom_lvbo_update(res, lock, req, !!increase_only); return 0; } @@ -321,40 +319,35 @@ static int mdt_lvbo_size(struct ldlm_lock *lock) * and the needed lvb buffer size will be returned in * @lvblen */ -static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock, +static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int *lvblen) { struct mdt_thread_info *info; struct mdt_device *mdt; + struct lu_env *env; struct lu_fid *fid; struct mdt_object *obj = NULL; struct md_object *child = NULL; - struct lu_env _env; int rc; ENTRY; - if (!env) { - rc = lu_env_init(&_env, LCT_DT_THREAD); - if (rc) - RETURN(rc); - env = &_env; - } + env = lu_env_find(); + LASSERT(env); mdt = ldlm_lock_to_ns(lock)->ns_lvbp; if (IS_LQUOTA_RES(lock->l_resource)) { if (mdt->mdt_qmt_dev == NULL) - GOTO(out_env, rc = 0); + GOTO(out, rc = 0); /* call lvbo fill function of quota master */ rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb, *lvblen); - GOTO(out_env, rc); + GOTO(out, rc); } info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); if (!info) { - rc = lu_env_refill_by_tags((struct lu_env *)env, - LCT_MD_THREAD, 0); + rc = lu_env_refill_by_tags(env, LCT_MD_THREAD, 0); if (rc) GOTO(out, rc); info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); @@ -375,8 +368,7 @@ static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock, int lvb_len = sizeof(struct ost_lvb); if (!mdt_dom_lvb_is_valid(res)) - mdt_dom_lvbo_update(env, lock->l_resource, - lock, NULL, 0); + mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0); if (lvb_len > *lvblen) lvb_len = *lvblen; @@ -445,9 +437,6 @@ out_put: out: if (rc < 0 && rc != -ERANGE) rc = 0; -out_env: - if (env == &_env) - lu_env_fini(&_env); RETURN(rc); } diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 9f62693..ab578a3 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -1882,6 +1882,101 @@ int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags, } EXPORT_SYMBOL(lu_env_refill_by_tags); + +struct lu_env_item { + struct task_struct *lei_task; /* rhashtable key */ + struct rhash_head lei_linkage; + struct lu_env *lei_env; +}; + +static const struct rhashtable_params lu_env_rhash_params = { + .key_len = sizeof(struct task_struct *), + .key_offset = offsetof(struct lu_env_item, lei_task), + .head_offset = offsetof(struct lu_env_item, lei_linkage), + }; + +struct rhashtable lu_env_rhash; + +struct lu_env_percpu { + struct task_struct *lep_task; + struct lu_env *lep_env ____cacheline_aligned_in_smp; +}; + +static struct lu_env_percpu lu_env_percpu[NR_CPUS]; + +int lu_env_add(struct lu_env *env) +{ + struct lu_env_item *lei, *old; + + LASSERT(env); + + OBD_ALLOC_PTR(lei); + if (!lei) + return -ENOMEM; + + lei->lei_task = current; + lei->lei_env = env; + + old = rhashtable_lookup_get_insert_fast(&lu_env_rhash, + &lei->lei_linkage, + lu_env_rhash_params); + LASSERT(!old); + + return 0; +} +EXPORT_SYMBOL(lu_env_add); + +void lu_env_remove(struct lu_env *env) +{ + struct lu_env_item *lei; + const void *task = current; + int i; + + for_each_possible_cpu(i) { + if (lu_env_percpu[i].lep_env == env) { + LASSERT(lu_env_percpu[i].lep_task == task); + lu_env_percpu[i].lep_task = NULL; + lu_env_percpu[i].lep_env = NULL; + } + } + + rcu_read_lock(); + lei = rhashtable_lookup_fast(&lu_env_rhash, &task, + lu_env_rhash_params); + if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage, + lu_env_rhash_params) == 0) + OBD_FREE_PTR(lei); + rcu_read_unlock(); +} +EXPORT_SYMBOL(lu_env_remove); + +struct lu_env *lu_env_find(void) +{ + struct lu_env *env = NULL; + struct lu_env_item *lei; + const void *task = current; + int i = get_cpu(); + + if (lu_env_percpu[i].lep_task == current) { + env = lu_env_percpu[i].lep_env; + put_cpu(); + LASSERT(env); + return env; + } + + lei = rhashtable_lookup_fast(&lu_env_rhash, &task, + lu_env_rhash_params); + if (lei) { + env = lei->lei_env; + lu_env_percpu[i].lep_task = current; + lu_env_percpu[i].lep_env = env; + } + put_cpu(); + + return env; +} +EXPORT_SYMBOL(lu_env_find); + static struct shrinker *lu_site_shrinker; typedef struct lu_site_stats{ @@ -2097,7 +2192,7 @@ void lu_context_keys_dump(void) */ int lu_global_init(void) { - int result; + int result; DEF_SHRINKER_VAR(shvar, lu_cache_shrink, lu_cache_shrink_count, lu_cache_shrink_scan); @@ -2132,6 +2227,8 @@ int lu_global_init(void) if (lu_site_shrinker == NULL) return -ENOMEM; + result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params); + return result; } @@ -2155,6 +2252,8 @@ void lu_global_fini(void) lu_env_fini(&lu_shrink_env); up_write(&lu_sites_guard); + rhashtable_destroy(&lu_env_rhash); + lu_ref_global_fini(); } diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 265bb54..563300d 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -2758,6 +2758,9 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = lu_env_init(env, LCT_DT_THREAD); if (rc) GOTO(out_alloc, rc = -ENOMEM); + lu_env_add(env); + if (rc) + GOTO(out_env_fini, rc = -ENOMEM); #ifdef HAVE_SERVER_SUPPORT env->le_ses = &echo_session; @@ -2899,6 +2902,8 @@ out: lu_context_fini(env->le_ses); out_env: #endif + lu_env_remove(env); +out_env_fini: lu_env_fini(env); out_alloc: OBD_FREE_PTR(env); diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index e63a8d7..8d58552 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -1355,7 +1355,7 @@ out: res = ldlm_resource_get(ofd->ofd_namespace, NULL, &tsi->tsi_resid, LDLM_EXTENT, 0); if (!IS_ERR(res)) { - ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0); + ldlm_res_lvbo_update(res, NULL, 0); ldlm_resource_putref(res); } } @@ -2032,7 +2032,7 @@ out: if (!IS_ERR(res)) { struct ost_lvb *res_lvb; - ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0); + ldlm_res_lvbo_update(res, NULL, 0); res_lvb = res->lr_lvb_data; repbody->oa.o_valid |= OBD_MD_FLBLOCKS; repbody->oa.o_blocks = res_lvb->lvb_blocks; diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index 78c1a08..936faf8 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -1353,7 +1353,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, rs = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); if (!IS_ERR(rs)) { - ldlm_res_lvbo_update(env, rs, NULL, 1); + ldlm_res_lvbo_update(rs, NULL, 1); ldlm_resource_putref(rs); } } diff --git a/lustre/ofd/ofd_lvb.c b/lustre/ofd/ofd_lvb.c index 7c21683..d9638bc 100644 --- a/lustre/ofd/ofd_lvb.c +++ b/lustre/ofd/ofd_lvb.c @@ -82,13 +82,13 @@ static int ofd_lvbo_free(struct ldlm_resource *res) * \retval 0 on successful setup * \retval negative value on error */ -static int ofd_lvbo_init(const struct lu_env *env, struct ldlm_resource *res) +static int ofd_lvbo_init(struct ldlm_resource *res) { struct ost_lvb *lvb; struct ofd_device *ofd; struct ofd_object *fo; struct ofd_thread_info *info; - struct lu_env _env; + struct lu_env *env; int rc = 0; ENTRY; @@ -104,12 +104,8 @@ static int ofd_lvbo_init(const struct lu_env *env, struct ldlm_resource *res) if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB)) RETURN(-ENOMEM); - if (!env) { - rc = lu_env_init(&_env, LCT_DT_THREAD); - if (rc) - RETURN(rc); - env = &_env; - } + env = lu_env_find(); + LASSERT(env); OBD_ALLOC_PTR(lvb); if (lvb == NULL) @@ -166,8 +162,6 @@ out_lvb: OST_LVB_SET_ERR(lvb->lvb_blocks, rc); out: /* Don't free lvb data on lookup error */ - if (env && env == &_env) - lu_env_fini(&_env); return rc; } @@ -200,18 +194,19 @@ out: * \retval 0 on successful setup * \retval negative value on error */ -static int ofd_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, - struct ldlm_lock *lock, struct ptlrpc_request *req, - int increase_only) +static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *req, int increase_only) { - struct ofd_thread_info *info; - struct ofd_device *ofd; - struct ofd_object *fo; - struct ost_lvb *lvb; - int rc = 0; + struct ofd_thread_info *info; + struct ofd_device *ofd; + struct ofd_object *fo; + struct ost_lvb *lvb; + const struct lu_env *env; + int rc = 0; ENTRY; + env = lu_env_find(); LASSERT(env); info = ofd_info(env); LASSERT(res != NULL); @@ -376,8 +371,7 @@ static int ofd_lvbo_size(struct ldlm_lock *lock) * * \retval size of LVB data written into \a buf buffer */ -static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock, - void *buf, int *buflen) +static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int *buflen) { struct ldlm_resource *res = lock->l_resource; int lvb_len; diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index eff9138..478854c 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -959,7 +959,7 @@ out: res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); if (!IS_ERR(res)) { - ldlm_res_lvbo_update(env, res, NULL, 0); + ldlm_res_lvbo_update(res, NULL, 0); ldlm_resource_putref(res); } } diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 03e6164..a0eade5 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -2408,7 +2408,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) struct list_head *tmp; struct ptlrpc_request *req; struct l_wait_info lwi; - struct lu_env _env; time64_t timeout; int rc; @@ -2426,20 +2425,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) if (list_empty(&set->set_requests)) RETURN(0); - /* - * ideally we want env provide by the caller all the time, - * but at the moment that would mean a massive change in - * LDLM while benefits would be close to zero, so just - * initialize env here for those rare cases - */ - if (!env) { - /* XXX: skip on the client side? */ - rc = lu_env_init(&_env, LCT_DT_THREAD); - if (rc) - RETURN(rc); - env = &_env; - } - do { timeout = ptlrpc_set_next_timeout(set); @@ -2472,7 +2457,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) ptlrpc_expired_set, set); rc = l_wait_event(set->set_waitq, - ptlrpc_check_set(env, set), &lwi); + ptlrpc_check_set(NULL, set), &lwi); /* * LU-769 - if we ignored the signal because it was already @@ -2529,9 +2514,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) rc = req->rq_status; } - if (env && env == &_env) - lu_env_fini(&_env); - RETURN(rc); } EXPORT_SYMBOL(ptlrpc_set_wait); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index a279538..ca01179 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -2684,10 +2684,8 @@ static int ptlrpc_main(void *arg) } ginfo = groups_alloc(0); - if (!ginfo) { - rc = -ENOMEM; - goto out; - } + if (!ginfo) + GOTO(out, rc = -ENOMEM); set_current_groups(ginfo); put_group_info(ginfo); @@ -2695,19 +2693,20 @@ static int ptlrpc_main(void *arg) if (svc->srv_ops.so_thr_init != NULL) { rc = svc->srv_ops.so_thr_init(thread); if (rc) - goto out; + GOTO(out, rc); } OBD_ALLOC_PTR(env); - if (env == NULL) { - rc = -ENOMEM; - goto out_srv_fini; - } + if (env == NULL) + GOTO(out_srv_fini, rc = -ENOMEM); + rc = lu_env_add(env); + if (rc) + GOTO(out_env, rc); rc = lu_context_init(&env->le_ctx, svc->srv_ctx_tags|LCT_REMEMBER|LCT_NOREF); if (rc) - goto out_srv_fini; + GOTO(out_env_remove, rc); thread->t_env = env; env->le_ctx.lc_thread = thread; @@ -2720,15 +2719,13 @@ static int ptlrpc_main(void *arg) CERROR("Failed to post rqbd for %s on CPT %d: %d\n", svc->srv_name, svcpt->scp_cpt, rc); - goto out_srv_fini; + GOTO(out_ctx_fini, rc); } /* Alloc reply state structure for this one */ OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size); - if (!rs) { - rc = -ENOMEM; - goto out_srv_fini; - } + if (!rs) + GOTO(out_ctx_fini, rc = -ENOMEM); spin_lock(&svcpt->scp_lock); @@ -2820,15 +2817,16 @@ static int ptlrpc_main(void *arg) ptlrpc_watchdog_disable(&thread->t_watchdog); +out_ctx_fini: + lu_context_fini(&env->le_ctx); +out_env_remove: + lu_env_remove(env); +out_env: + OBD_FREE_PTR(env); out_srv_fini: /* deconstruct service thread state created by ptlrpc_start_thread() */ if (svc->srv_ops.so_thr_done != NULL) svc->srv_ops.so_thr_done(thread); - - if (env != NULL) { - lu_context_fini(&env->le_ctx); - OBD_FREE_PTR(env); - } out: CDEBUG(D_RPCTRACE, "%s: service thread [%p:%u] %d exiting: rc = %d\n", thread->t_name, thread, thread->t_pid, thread->t_id, rc); diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index cbb4a82..cca59f6 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -140,7 +140,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, /* on success, pack lvb in reply */ lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); lvb_len = ldlm_lvbo_size(*lockp); - lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, &lvb_len); + lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len); if (lvb_len < 0) GOTO(out, rc = lvb_len); @@ -173,14 +173,8 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB) RETURN(0); - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_MD_THREAD); - if (rc != 0) - GOTO(out_free, rc); + env = lu_env_find(); + LASSERT(env); qti = qmt_info(env); /* extract global index FID and quota identifier */ @@ -228,9 +222,6 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) res->lr_lvb_len = sizeof(struct lquota_lvb); EXIT; out: - lu_env_fini(env); -out_free: - OBD_FREE_PTR(env); return rc; } @@ -281,18 +272,12 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, if (lvb->lvb_id_may_rel != 0) /* but might still release later ... */ lqe->lqe_may_rel += lvb->lvb_id_may_rel; - GOTO(out_lqe, rc = 0); + GOTO(out, rc = 0); } /* allocate environement */ - OBD_ALLOC_PTR(env); - if (env == NULL) - GOTO(out_lqe, rc = -ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_MD_THREAD); - if (rc) - GOTO(out_env, rc); + env = lu_env_find(); + LASSERT(env); qti = qmt_info(env); /* The request is a glimpse callback which was sent via the @@ -306,14 +291,14 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, if (IS_ERR(lock)) { CERROR("%s: failed to get lock from request!\n", qmt->qmt_svname); - GOTO(out_env_init, rc = PTR_ERR(lock)); + GOTO(out, rc = PTR_ERR(lock)); } exp = class_export_get(lock->l_export); if (exp == NULL) { CERROR("%s: failed to get export from lock!\n", qmt->qmt_svname); - GOTO(out_env_init, rc = -EFAULT); + GOTO(out, rc = -EFAULT); } /* release quota space */ @@ -325,13 +310,9 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, lvb->lvb_id_rel, rc); class_export_put(exp); if (rc) - GOTO(out_env_init, rc); + GOTO(out, rc); EXIT; -out_env_init: - lu_env_fini(env); -out_env: - OBD_FREE_PTR(env); -out_lqe: +out: lqe_putref(lqe); return rc; } @@ -376,26 +357,10 @@ int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb, lqe_putref(lqe); } else { /* global quota lock */ - struct lu_env *env; - int rc; - struct dt_object *obj = res->lr_lvb_data; - - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_LOCAL); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); - } + struct dt_object *obj = res->lr_lvb_data; /* return current version of global index */ - qlvb->lvb_glb_ver = dt_version_get(env, obj); - - lu_env_fini(env); - OBD_FREE_PTR(env); + qlvb->lvb_glb_ver = dt_version_get(lu_env_find(), obj); } RETURN(sizeof(struct lquota_lvb)); @@ -419,25 +384,9 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res) /* release lqe reference */ lqe_putref(lqe); } else { - struct dt_object *obj = res->lr_lvb_data; - struct lu_env *env; - int rc; - - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_LOCAL); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); - } - + struct dt_object *obj = res->lr_lvb_data; /* release object reference */ - dt_object_put(env, obj); - lu_env_fini(env); - OBD_FREE_PTR(env); + dt_object_put(lu_env_find(), obj); } res->lr_lvb_data = NULL; @@ -812,10 +761,11 @@ static int qmt_reba_thread(void *arg) rc = lu_env_init(env, LCT_MD_THREAD); if (rc) { CERROR("%s: failed to init env.", qmt->qmt_svname); - thread_set_flags(thread, SVC_STOPPED); - OBD_FREE_PTR(env); - RETURN(rc); + GOTO(out_env, rc); } + rc = lu_env_add(env); + if (rc) + GOTO(out_env_fini, rc); thread_set_flags(thread, SVC_RUNNING); wake_up(&thread->t_ctl_waitq); @@ -842,7 +792,10 @@ static int qmt_reba_thread(void *arg) if (!thread_is_running(thread)) break; } + lu_env_remove(env); +out_env_fini: lu_env_fini(env); +out_env: OBD_FREE_PTR(env); thread_set_flags(thread, SVC_STOPPED); wake_up(&thread->t_ctl_waitq); -- 1.8.3.1