From e02cb40761ff8aae3df76c4210a345420b6d4ba1 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Thu, 19 Jul 2018 16:02:43 +0400 Subject: [PATCH] LU-11164 ldlm: pass env to lvbo methods to save on env allocation. Benchmarks made by Shuichi Ihara demonstrated 13% improvement for small I/Os: 564k vs 639k IOPS. the details can be found LU-11164. Change-Id: I797e3d7e19ef408993004a2b872842d655240525 Signed-off-by: Alex Zhuravlev Reviewed-on: https://review.whamcloud.com/32832 Tested-by: Jenkins Reviewed-by: Andreas Dilger Reviewed-by: Patrick Farrell Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lu_target.h | 6 +-- lustre/include/lustre_dlm.h | 47 ++++++++++------- lustre/include/lustre_net.h | 2 +- lustre/ldlm/ldlm_internal.h | 6 ++- lustre/ldlm/ldlm_lock.c | 39 ++++++++++---- lustre/ldlm/ldlm_lockd.c | 66 +++++++++++++---------- lustre/ldlm/ldlm_request.c | 55 ++++++++++--------- lustre/lfsck/lfsck_engine.c | 8 +-- lustre/lfsck/lfsck_layout.c | 14 ++--- lustre/lfsck/lfsck_lib.c | 10 ++-- lustre/lfsck/lfsck_namespace.c | 2 +- lustre/lod/lod_dev.c | 2 +- lustre/lod/lod_object.c | 2 +- lustre/lov/lov_obd.c | 4 +- lustre/mdt/mdt_handler.c | 31 ++++++----- lustre/mdt/mdt_internal.h | 13 +++-- lustre/mdt/mdt_io.c | 12 ++--- lustre/mdt/mdt_lvb.c | 117 +++++++++++++++++++++-------------------- lustre/mdt/mdt_reint.c | 23 ++++---- lustre/mgs/mgs_handler.c | 2 +- lustre/ofd/ofd_dev.c | 27 +++++----- lustre/ofd/ofd_dlm.c | 6 +-- lustre/ofd/ofd_internal.h | 6 +-- lustre/ofd/ofd_io.c | 2 +- lustre/ofd/ofd_lvb.c | 54 +++++++++---------- lustre/ofd/ofd_obd.c | 4 +- lustre/ptlrpc/client.c | 39 ++++++++++---- lustre/ptlrpc/ptlrpcd.c | 8 +-- lustre/quota/qmt_lock.c | 2 +- lustre/target/tgt_handler.c | 29 +++++----- 30 files changed, 357 insertions(+), 281 deletions(-) diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 5b6c1ae..c698a8f 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -430,9 +430,9 @@ void tgt_io_thread_done(struct ptlrpc_thread *thread); int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, struct lustre_handle *lh, int mode, __u64 *flags); void tgt_mdt_data_unlock(struct lustre_handle *lh, enum ldlm_mode mode); -int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, - __u64 start, __u64 end, struct lustre_handle *lh, - int mode, __u64 *flags); +int tgt_extent_lock(const struct lu_env *env, struct ldlm_namespace *ns, + struct ldlm_res_id *res_id, __u64 start, __u64 end, + struct lustre_handle *lh, int mode, __u64 *flags); void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode); int tgt_brw_read(struct tgt_session_info *tsi); int tgt_brw_write(struct tgt_session_info *tsi); diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 90ffb3e..5262a1d 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -272,9 +272,10 @@ struct ldlm_pool { struct completion pl_kobj_unregister; }; -typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **, - void *req_cookie, enum ldlm_mode mode, - __u64 flags, void *data); +typedef int (*ldlm_res_policy)(const struct lu_env *env, + struct ldlm_namespace *, + struct ldlm_lock **, void *req_cookie, + enum ldlm_mode mode, __u64 flags, void *data); typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock); @@ -292,14 +293,16 @@ typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock); * of ldlm_[res_]lvbo_[init,update,fill]() functions. */ struct ldlm_valblock_ops { - int (*lvbo_init)(struct ldlm_resource *res); - int (*lvbo_update)(struct ldlm_resource *res, struct ldlm_lock *lock, - struct ptlrpc_request *r, int increase); + int (*lvbo_init)(const struct lu_env *env, struct ldlm_resource *res); + int (*lvbo_update)(const struct lu_env *env, struct ldlm_resource *res, + struct ldlm_lock *lock, struct ptlrpc_request *r, + int increase); int (*lvbo_free)(struct ldlm_resource *res); /* Return size of lvb data appropriate RPC size can be reserved */ int (*lvbo_size)(struct ldlm_lock *lock); /* Called to fill in lvb data to RPC buffer @buf */ - int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int buflen); + int (*lvbo_fill)(const struct lu_env *env, struct ldlm_lock *lock, + void *buf, int buflen); }; /** @@ -1076,7 +1079,8 @@ ldlm_lock_to_ns_at(struct ldlm_lock *lock) return &lock->l_resource->lr_ns_bucket->nsb_at_estimate; } -static inline int ldlm_lvbo_init(struct ldlm_resource *res) +static inline int ldlm_lvbo_init(const struct lu_env *env, + struct ldlm_resource *res) { struct ldlm_namespace *ns = ldlm_res_to_ns(res); int rc = 0; @@ -1091,7 +1095,7 @@ static inline int ldlm_lvbo_init(struct ldlm_resource *res) mutex_unlock(&res->lr_lvb_mutex); return 0; } - rc = ns->ns_lvbo->lvbo_init(res); + rc = ns->ns_lvbo->lvbo_init(env, res); if (rc < 0) { CDEBUG(D_DLMTRACE, "lvbo_init failed for resource : rc = %d\n", rc); @@ -1117,7 +1121,8 @@ static inline int ldlm_lvbo_size(struct ldlm_lock *lock) return 0; } -static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len) +static inline int ldlm_lvbo_fill(const struct lu_env *env, + struct ldlm_lock *lock, void *buf, int len) { struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); int rc; @@ -1125,13 +1130,13 @@ static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len) if (ns->ns_lvbo != NULL) { LASSERT(ns->ns_lvbo->lvbo_fill != NULL); /* init lvb now if not already */ - rc = ldlm_lvbo_init(lock->l_resource); + rc = ldlm_lvbo_init(env, lock->l_resource); if (rc < 0) { CERROR("lock %p: delayed lvb init failed (rc %d)", lock, rc); return rc; } - return ns->ns_lvbo->lvbo_fill(lock, buf, len); + return ns->ns_lvbo->lvbo_fill(env, lock, buf, len); } return 0; } @@ -1392,7 +1397,8 @@ ldlm_handle2lock_long(const struct lustre_handle *h, __u64 flags) * Update Lock Value Block Operations (LVBO) on a resource taking into account * data from request \a r */ -static inline int ldlm_lvbo_update(struct ldlm_resource *res, +static inline int ldlm_lvbo_update(const struct lu_env *env, + struct ldlm_resource *res, struct ldlm_lock *lock, struct ptlrpc_request *req, int increase) { @@ -1400,22 +1406,24 @@ static inline int ldlm_lvbo_update(struct ldlm_resource *res, int rc; /* delayed lvb init may be required */ - rc = ldlm_lvbo_init(res); + rc = ldlm_lvbo_init(env, res); if (rc < 0) { CERROR("delayed lvb init failed (rc %d)\n", rc); return rc; } if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update) - return ns->ns_lvbo->lvbo_update(res, lock, req, increase); + return ns->ns_lvbo->lvbo_update(env, res, lock, req, increase); return 0; } -static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, - struct ptlrpc_request *req, int increase) +static inline int ldlm_res_lvbo_update(const struct lu_env *env, + struct ldlm_resource *res, + struct ptlrpc_request *req, + int increase) { - return ldlm_lvbo_update(res, NULL, req, increase); + return ldlm_lvbo_update(env, res, NULL, req, increase); } int ldlm_error2errno(enum ldlm_error error); @@ -1613,7 +1621,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, enum ldlm_mode mode, __u64 *flags, void *lvb, __u32 lvb_len, const struct lustre_handle *lockh, int rc); -int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, +int ldlm_cli_enqueue_local(const struct lu_env *env, + struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, enum ldlm_type type, union ldlm_policy_data *policy, enum ldlm_mode mode, __u64 *flags, diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 45c6d48..3274334 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -2130,7 +2130,7 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void); struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, void *arg); int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set); -int ptlrpc_set_wait(struct ptlrpc_request_set *); +int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *); void ptlrpc_mark_interrupted(struct ptlrpc_request *req); void ptlrpc_set_destroy(struct ptlrpc_request_set *); void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *); diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 572b686..e2454e5 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -145,7 +145,9 @@ ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *, enum ldlm_type type, enum ldlm_mode mode, const struct ldlm_callback_suite *cbs, void *data, __u32 lvb_len, enum lvb_type lvb_type); -enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, +enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env, + struct ldlm_namespace *, + struct ldlm_lock **, void *cookie, __u64 *flags); void ldlm_lock_addref_internal(struct ldlm_lock *, enum ldlm_mode mode); void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, enum ldlm_mode mode); @@ -162,7 +164,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, void ldlm_discard_bl_list(struct list_head *bl_list); #endif int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, - ldlm_desc_ast_t ast_type); + ldlm_desc_ast_t ast_type); int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq); int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, ktime_t last_use); #define ldlm_lock_remove_from_lru(lock) \ diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 44680dd..2b51f93 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1727,7 +1727,8 @@ restart: * set, skip all the enqueueing and delegate lock processing to intent policy * function. */ -enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, +enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env, + struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *cookie, __u64 *flags) { @@ -1741,8 +1742,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, /* policies are not executed on the client or during replay */ if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT && !local && ns->ns_policy) { - rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags, - NULL); + rc = ns->ns_policy(env, ns, lockp, cookie, lock->l_req_mode, + *flags, NULL); if (rc == ELDLM_LOCK_REPLACED) { /* The lock that was returned has already been granted, * and placed into lockp. If it's not the same as the @@ -2199,7 +2200,7 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) * one. */ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, - ldlm_desc_ast_t ast_type) + ldlm_desc_ast_t ast_type) { struct ldlm_cb_set_arg *arg; set_producer_func work_ast_lock; @@ -2245,7 +2246,7 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, if (arg->set == NULL) GOTO(out, rc = -ENOMEM); - ptlrpc_set_wait(arg->set); + ptlrpc_set_wait(NULL, arg->set); ptlrpc_set_destroy(arg->set); rc = atomic_read(&arg->restart) ? -ERESTART : 0; @@ -2455,6 +2456,7 @@ int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data) EXPORT_SYMBOL(ldlm_lock_set_data); struct export_cl_data { + const struct lu_env *ecl_env; struct obd_export *ecl_exp; int ecl_loop; }; @@ -2467,7 +2469,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp, res = ldlm_resource_getref(lock->l_resource); - ldlm_lvbo_update(res, lock, NULL, 1); + ldlm_lvbo_update(ecl->ecl_env, res, lock, NULL, 1); ldlm_lock_cancel(lock); if (!exp->exp_obd->obd_stopping) ldlm_reprocess_all(res); @@ -2507,10 +2509,17 @@ ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, */ int ldlm_export_cancel_blocked_locks(struct obd_export *exp) { + struct lu_env env; struct export_cl_data ecl = { .ecl_exp = exp, .ecl_loop = 0, }; + int rc; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + ecl.ecl_env = &env; while (!list_empty(&exp->exp_bl_list)) { struct ldlm_lock *lock; @@ -2533,6 +2542,8 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp) LDLM_LOCK_RELEASE(lock); } + lu_env_fini(&env); + CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, " "left on hash table %d.\n", exp, ecl.ecl_loop, atomic_read(&exp->exp_lock_hash->hs_count)); @@ -2547,10 +2558,16 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp) */ int ldlm_export_cancel_locks(struct obd_export *exp) { - struct export_cl_data ecl = { - .ecl_exp = exp, - .ecl_loop = 0, - }; + struct export_cl_data ecl; + struct lu_env env; + int rc; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + ecl.ecl_env = &env; + ecl.ecl_exp = exp; + ecl.ecl_loop = 0; cfs_hash_for_each_empty(exp->exp_lock_hash, ldlm_cancel_locks_for_export_cb, &ecl); @@ -2564,6 +2581,8 @@ int ldlm_export_cancel_locks(struct obd_export *exp) exp->exp_obd->obd_stopping) ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace); + lu_env_fini(&env); + return ecl.ecl_loop; } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index c0b4edc..21a3c96 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -633,7 +633,8 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, /** * Perform lock cleanup if AST reply came with error. */ -static int ldlm_handle_ast_error(struct ldlm_lock *lock, +static int ldlm_handle_ast_error(const struct lu_env *env, + struct ldlm_lock *lock, struct ptlrpc_request *req, int rc, const char *ast_type) { @@ -692,7 +693,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, /* update lvbo to return proper attributes. * see bug 23174 */ ldlm_resource_getref(res); - ldlm_lvbo_update(res, lock, NULL, 1); + ldlm_lvbo_update(env, res, lock, NULL, 1); ldlm_resource_putref(res); } ldlm_lock_cancel(lock); @@ -727,20 +728,24 @@ static int ldlm_cb_interpret(const struct lu_env *env, } else if (rc == -ELDLM_NO_LOCK_DATA) { LDLM_DEBUG(lock, "lost race - client has a lock but no " "inode"); - ldlm_lvbo_update(lock->l_resource, lock, NULL, 1); + ldlm_lvbo_update(env, lock->l_resource, lock, NULL, 1); } else if (rc != 0) { - rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); + rc = ldlm_handle_ast_error(env, lock, req, + rc, "glimpse"); } else { - rc = ldlm_lvbo_update(lock->l_resource, lock, req, 1); + rc = ldlm_lvbo_update(env, lock->l_resource, + lock, req, 1); } break; case LDLM_BL_CALLBACK: if (rc != 0) - rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); + rc = ldlm_handle_ast_error(env, lock, req, + rc, "blocking"); break; case LDLM_CP_CALLBACK: if (rc != 0) - rc = ldlm_handle_ast_error(lock, req, rc, "completion"); + rc = ldlm_handle_ast_error(env, lock, req, + rc, "completion"); break; default: LDLM_ERROR(lock, "invalid opcode for lock callback %d", @@ -988,8 +993,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) ldlm_lock2desc(lock, &body->lock_desc); if (lvb_len > 0) { void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); + const struct lu_env *env = NULL; - lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len); + if (req->rq_svc_thread) + env = req->rq_svc_thread->t_env; + + lvb_len = ldlm_lvbo_fill(env, lock, lvb, lvb_len); if (lvb_len < 0) { /* We still need to send the RPC to wake up the blocked * enqueue thread on the client. @@ -1174,6 +1183,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, void *cookie = NULL; int rc = 0; struct ldlm_resource *res = NULL; + const struct lu_env *env = req->rq_svc_thread->t_env; ENTRY; LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); @@ -1254,7 +1264,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, res = lock->l_resource; if (!(flags & LDLM_FL_REPLAY)) { /* non-replayed lock, delayed lvb init may need to be done */ - rc = ldlm_lvbo_init(res); + rc = ldlm_lvbo_init(env, res); if (rc < 0) { LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc); GOTO(out, rc); @@ -1302,25 +1312,25 @@ existing_lock: req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, ldlm_lvbo_size(lock)); - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) - GOTO(out, rc = -ENOMEM); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) + GOTO(out, rc = -ENOMEM); - rc = req_capsule_server_pack(&req->rq_pill); - if (rc) - GOTO(out, rc); - } + rc = req_capsule_server_pack(&req->rq_pill); + if (rc) + GOTO(out, rc); + } - err = ldlm_lock_enqueue(ns, &lock, cookie, &flags); + err = ldlm_lock_enqueue(env, ns, &lock, cookie, &flags); if (err) { if ((int)err < 0) rc = (int)err; GOTO(out, err); } - dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - ldlm_lock2desc(lock, &dlm_rep->lock_desc); - ldlm_lock2handle(lock, &dlm_rep->lock_handle); + ldlm_lock2desc(lock, &dlm_rep->lock_desc); + ldlm_lock2handle(lock, &dlm_rep->lock_handle); if (lock && lock->l_resource->lr_type == LDLM_EXTENT) OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6); @@ -1410,7 +1420,7 @@ existing_lock: /* non-replayed lock, delayed lvb init may * need to be occur now */ if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) { - buflen = ldlm_lvbo_fill(lock, buf, + buflen = ldlm_lvbo_fill(env, lock, buf, buflen); if (buflen >= 0) req_capsule_shrink( @@ -1550,6 +1560,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, const struct ldlm_request *dlm_req, int first, enum lustre_at_flags flags) { + const struct lu_env *env = req->rq_svc_thread->t_env; struct ldlm_resource *res, *pres = NULL; struct ldlm_lock *lock; int i, count, done = 0; @@ -1591,15 +1602,16 @@ int ldlm_request_cancel(struct ptlrpc_request *req, LDLM_RESOURCE_DELREF(pres); ldlm_resource_putref(pres); } - if (res != NULL) { - ldlm_resource_getref(res); - LDLM_RESOURCE_ADDREF(res); + if (res != NULL) { + ldlm_resource_getref(res); + LDLM_RESOURCE_ADDREF(res); if (!ldlm_is_discard_data(lock)) - ldlm_lvbo_update(res, lock, NULL, 1); - } - pres = res; - } + ldlm_lvbo_update(env, res, lock, + NULL, 1); + } + pres = res; + } if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock) && lock->l_blast_sent != 0) { diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index fcb2b4a..78e2547 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -431,7 +431,8 @@ int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp) /** * Enqueue a local lock (typically on a server). */ -int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, +int ldlm_cli_enqueue_local(const struct lu_env *env, + struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, enum ldlm_type type, union ldlm_policy_data *policy, enum ldlm_mode mode, __u64 *flags, @@ -461,7 +462,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, if (IS_ERR(lock)) GOTO(out_nolock, err = PTR_ERR(lock)); - err = ldlm_lvbo_init(lock->l_resource); + err = ldlm_lvbo_init(env, lock->l_resource); if (err < 0) { LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err); ldlm_lock_destroy_nolock(lock); @@ -489,15 +490,15 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, lock->l_req_extent = policy->l_extent; } - err = ldlm_lock_enqueue(ns, &lock, policy, flags); - if (unlikely(err != ELDLM_OK)) - GOTO(out, err); + err = ldlm_lock_enqueue(env, ns, &lock, policy, flags); + if (unlikely(err != ELDLM_OK)) + GOTO(out, err); - if (policy != NULL) - *policy = lock->l_policy_data; + if (policy != NULL) + *policy = lock->l_policy_data; - if (lock->l_completion_ast) - lock->l_completion_ast(lock, *flags, NULL); + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags, NULL); LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); EXIT; @@ -564,12 +565,16 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, __u32 lvb_len, const struct lustre_handle *lockh, int rc) { - struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; - int is_replay = *flags & LDLM_FL_REPLAY; - struct ldlm_lock *lock; - struct ldlm_reply *reply; - int cleanup_phase = 1; - ENTRY; + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + const struct lu_env *env = NULL; + int is_replay = *flags & LDLM_FL_REPLAY; + struct ldlm_lock *lock; + struct ldlm_reply *reply; + int cleanup_phase = 1; + ENTRY; + + if (req && req->rq_svc_thread) + env = req->rq_svc_thread->t_env; lock = ldlm_handle2lock(lockh); /* ldlm_cli_enqueue is holding a reference on this lock. */ @@ -707,16 +712,16 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, } } - if (!is_replay) { - rc = ldlm_lock_enqueue(ns, &lock, NULL, flags); - if (lock->l_completion_ast != NULL) { - int err = lock->l_completion_ast(lock, *flags, NULL); - if (!rc) - rc = err; - if (rc) - cleanup_phase = 1; - } - } + if (!is_replay) { + rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags); + if (lock->l_completion_ast != NULL) { + int err = lock->l_completion_ast(lock, *flags, NULL); + if (!rc) + rc = err; + if (rc) + cleanup_phase = 1; + } + } if (lvb_len > 0 && lvb != NULL) { /* Copy the LVB here, and not earlier, because the completion diff --git a/lustre/lfsck/lfsck_engine.c b/lustre/lfsck/lfsck_engine.c index f49fda2..20866fc 100644 --- a/lustre/lfsck/lfsck_engine.c +++ b/lustre/lfsck/lfsck_engine.c @@ -1235,7 +1235,7 @@ again: } spin_unlock(<ds->ltd_lock); - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); if (rc < 0) { ptlrpc_set_destroy(set); RETURN(rc); @@ -1330,7 +1330,7 @@ static int lfsck_assistant_notify_others(const struct lu_env *env, up_read(<ds->ltd_rw_sem); /* Sync up */ - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); if (rc < 0) { ptlrpc_set_destroy(set); RETURN(rc); @@ -1462,7 +1462,7 @@ again: } spin_unlock(<ds->ltd_lock); - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); if (rc < 0) { ptlrpc_set_destroy(set); RETURN(rc); @@ -1536,7 +1536,7 @@ again: break; } - rc1 = ptlrpc_set_wait(set); + rc1 = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); RETURN(rc != 0 ? rc : rc1); diff --git a/lustre/lfsck/lfsck_layout.c b/lustre/lfsck/lfsck_layout.c index cdb46d8c..e866566 100644 --- a/lustre/lfsck/lfsck_layout.c +++ b/lustre/lfsck/lfsck_layout.c @@ -308,7 +308,7 @@ static void lfsck_layout_assistant_sync_failures(const struct lu_env *env, up_read(<ds->ltd_rw_sem); if (rc == 0 && atomic_read(&count) > 0) - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); @@ -2580,10 +2580,10 @@ static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env, memset(policy, 0, sizeof(*policy)); policy->l_extent.end = OBD_OBJECT_EOF; ost_fid_build_resid(fid, resid); - rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT, - policy, LCK_EX, &flags, ldlm_blocking_ast, - ldlm_completion_ast, NULL, NULL, 0, - LVB_T_NONE, NULL, &lh); + rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid, + LDLM_EXTENT, policy, LCK_EX, &flags, + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, 0, LVB_T_NONE, NULL, &lh); if (rc != ELDLM_OK) GOTO(put, rc = -EIO); @@ -4752,7 +4752,7 @@ lfsck_layout_slave_query_master(const struct lu_env *env, } spin_unlock(&llsd->llsd_lock); - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); GOTO(log, rc = (rc1 != 0 ? rc1 : rc)); @@ -4830,7 +4830,7 @@ lfsck_layout_slave_notify_master(const struct lu_env *env, } spin_unlock(&llsd->llsd_lock); - ptlrpc_set_wait(set); + ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); RETURN_EXIT; diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c index 46ae40b..82c3168 100644 --- a/lustre/lfsck/lfsck_lib.c +++ b/lustre/lfsck/lfsck_lib.c @@ -380,7 +380,7 @@ static int __lfsck_ibits_lock(const struct lu_env *env, rc = dt_object_lock(env, obj, lh, einfo, policy); } else { - rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, + rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid, LDLM_IBITS, policy, mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, @@ -2347,7 +2347,7 @@ static int lfsck_stop_notify(const struct lu_env *env, ltd->ltd_index, lad->lad_name, rc); lfsck_tgt_put(ltd); } else { - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); } ptlrpc_set_destroy(set); @@ -2484,7 +2484,7 @@ again: goto again; } - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); RETURN(rc); @@ -2946,7 +2946,7 @@ static int lfsck_stop_all(const struct lu_env *env, } up_read(<ds->ltd_rw_sem); - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); if (rc == 0) @@ -3037,7 +3037,7 @@ again: RETURN(rc); } - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); if (rc == 0) diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 7f01acb..0459052 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -6577,7 +6577,7 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, } up_read(<ds->ltd_rw_sem); - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); GOTO(out, rc); diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 6949283..810bf35 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -2145,7 +2145,7 @@ static int lod_obd_set_info_async(const struct lu_env *env, if (no_set) { - rc2 = ptlrpc_set_wait(set); + rc2 = ptlrpc_set_wait(env, set); if (rc2 == 0 && rc == 0) rc = rc2; ptlrpc_set_destroy(set); diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index dc896ef..0b34a43 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -5705,7 +5705,7 @@ static int lod_object_lock(const struct lu_env *env, dlmflags |= LDLM_FL_COS_INCOMPAT; LASSERT(ns != NULL); - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, + rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy, einfo->ei_mode, &dlmflags, blocking, completion, NULL, diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 9bdeab9..71b2829 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -945,7 +945,7 @@ static int lov_statfs(const struct lu_env *env, struct obd_export *exp, GOTO(out_set, rc); } - rc = ptlrpc_set_wait(rqset); + rc = ptlrpc_set_wait(env, rqset); out_set: if (rc < 0) @@ -1253,7 +1253,7 @@ static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp, lov_tgts_putref(obddev); if (no_set) { - err = ptlrpc_set_wait(set); + err = ptlrpc_set_wait(env, set); if (rc == 0) rc = err; ptlrpc_set_destroy(set); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 080dc19..ace6f1f 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2887,7 +2887,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0; + __u64 dlmflags = 0, *cookie = NULL; int rc; ENTRY; @@ -2919,10 +2919,12 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, } } - fid_build_reg_res_name(mdt_object_fid(o), res_id); dlmflags |= LDLM_FL_ATOMIC_CB; + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + /* * Take PDO lock on whole directory and build correct @res_id for lock * on part of directory. @@ -2943,10 +2945,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, /* at least one of them should be set */ LASSERT(policy->l_inodebits.bits | policy->l_inodebits.try_bits); - rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, policy, res_id, + dlmflags, cookie); if (unlikely(rc != 0)) GOTO(out_unlock, rc); } @@ -2966,10 +2967,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, + policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, + cookie); out_unlock: if (rc != 0) mdt_object_unlock(info, o, lh, 1); @@ -4043,9 +4043,12 @@ static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req, LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE)); } -static int mdt_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - enum ldlm_mode mode, __u64 flags, void *data) +static int mdt_intent_policy(const struct lu_env *env, + struct ldlm_namespace *ns, + struct ldlm_lock **lockp, + void *req_cookie, + enum ldlm_mode mode, + __u64 flags, void *data) { struct tgt_session_info *tsi; struct mdt_thread_info *info; @@ -4059,7 +4062,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, LASSERT(req != NULL); - tsi = tgt_ses_info(req->rq_svc_thread->t_env); + tsi = tgt_ses_info(env); info = tsi2mdt_info(tsi); LASSERT(info != NULL); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index f78a67e..c84aa6a 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -406,7 +406,8 @@ struct mdt_thread_info { /* * Object attributes. */ - struct md_attr mti_attr; + struct md_attr mti_attr; + struct md_attr mti_attr2; /* mdt_lvb.c */ /* * Body for "habeo corpus" operations. */ @@ -1137,7 +1138,8 @@ static int mdt_dom_glimpse_ast(struct ldlm_lock *lock, void *reqp) } /* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */ -static inline int mdt_fid_lock(struct ldlm_namespace *ns, +static inline int mdt_fid_lock(const struct lu_env *env, + struct ldlm_namespace *ns, struct lustre_handle *lh, enum ldlm_mode mode, union ldlm_policy_data *policy, const struct ldlm_res_id *res_id, @@ -1149,7 +1151,7 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns, LASSERT(ns != NULL); LASSERT(lh != NULL); - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, + rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy, mode, &flags, mdt_blocking_ast, ldlm_completion_ast, glimpse ? mdt_dom_glimpse_ast : NULL, @@ -1196,8 +1198,9 @@ int mdt_lsom_update(struct mdt_thread_info *info, struct mdt_object *obj, /* mdt_lvb.c */ extern struct ldlm_valblock_ops mdt_lvbo; int mdt_dom_lvb_is_valid(struct ldlm_resource *res); -int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, - struct ptlrpc_request *req, bool increase_only); +int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, + struct ldlm_lock *lock, struct ptlrpc_request *req, + bool increase_only); void mdt_enable_cos(struct mdt_device *dev, bool enable); int mdt_cos_is_enabled(struct mdt_device *); diff --git a/lustre/mdt/mdt_io.c b/lustre/mdt/mdt_io.c index c5dc89f..2d6f802 100644 --- a/lustre/mdt/mdt_io.c +++ b/lustre/mdt/mdt_io.c @@ -1026,7 +1026,7 @@ int mdt_dom_object_size(const struct lu_env *env, struct mdt_device *mdt, /* Update lvbo data if DoM lock returned or if LVB is not yet valid. */ if (dom_lock || !mdt_dom_lvb_is_valid(res)) - mdt_dom_lvbo_update(res, NULL, NULL, false); + mdt_dom_lvbo_update(env, res, NULL, NULL, false); mdt_lvb2body(res, mb); ldlm_resource_putref(res); @@ -1124,7 +1124,7 @@ int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns, fill_mbo: /* LVB can be without valid data in case of DOM */ if (!mdt_dom_lvb_is_valid(res)) - mdt_dom_lvbo_update(res, lock, NULL, false); + mdt_dom_lvbo_update(mti->mti_env, res, lock, NULL, false); mdt_lvb2body(res, mbo); RETURN(rc); } @@ -1225,10 +1225,10 @@ void mdt_dom_discard_data(struct mdt_thread_info *info, /* Tell the clients that the object is gone now and that they should * throw away any cached pages. */ - rc = ldlm_cli_enqueue_local(mdt->mdt_namespace, res_id, LDLM_IBITS, - policy, LCK_PW, &flags, ldlm_blocking_ast, - ldlm_completion_ast, NULL, NULL, 0, - LVB_T_NONE, NULL, &dom_lh); + rc = ldlm_cli_enqueue_local(info->mti_env, mdt->mdt_namespace, res_id, + LDLM_IBITS, policy, LCK_PW, &flags, + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, 0, LVB_T_NONE, NULL, &dom_lh); /* We only care about the side-effects, just drop the lock. */ if (rc == ELDLM_OK) diff --git a/lustre/mdt/mdt_lvb.c b/lustre/mdt/mdt_lvb.c index 6859d42..9ee1989 100644 --- a/lustre/mdt/mdt_lvb.c +++ b/lustre/mdt/mdt_lvb.c @@ -34,7 +34,7 @@ #include "mdt_internal.h" /* Called with res->lr_lvb_sem held */ -static int mdt_lvbo_init(struct ldlm_resource *res) +static int mdt_lvbo_init(const struct lu_env *env, struct ldlm_resource *res) { if (IS_LQUOTA_RES(res)) { struct mdt_device *mdt; @@ -96,7 +96,7 @@ int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo, if (!mdt_object_exists(mo) || mdt_object_remote(mo)) RETURN(-ENOENT); - ma = &info->mti_attr; + ma = &info->mti_attr2; ma->ma_valid = 0; ma->ma_need = MA_INODE; rc = mo_attr_get(env, mdt_object_child(mo), ma); @@ -140,15 +140,15 @@ int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo, RETURN(rc); } -int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, - struct ptlrpc_request *req, bool increase_only) +int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, + struct ldlm_lock *lock, struct ptlrpc_request *req, + bool increase_only) { struct obd_export *exp = lock ? lock->l_export : NULL; struct mdt_device *mdt; struct mdt_object *mo; struct mdt_thread_info *info; struct ost_lvb *lvb; - struct lu_env env; struct lu_fid *fid; int rc = 0; @@ -173,18 +173,21 @@ int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, if (mdt == NULL) RETURN(-ENOENT); - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); - - info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - if (info == NULL) - GOTO(out_env, rc = -ENOMEM); - - memset(info, 0, sizeof *info); - info->mti_env = &env; - info->mti_exp = req ? req->rq_export : NULL; - info->mti_mdt = mdt; + LASSERT(env); + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) { + rc = lu_env_refill_by_tags((struct lu_env *)env, + LCT_MD_THREAD, 0); + if (rc) + GOTO(out_env, rc); + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) + GOTO(out_env, rc = -ENOMEM); + } + if (!info->mti_exp) + info->mti_exp = req ? req->rq_export : NULL; + if (!info->mti_mdt) + info->mti_mdt = mdt; fid = &info->mti_tmp_fid2; fid_extract_from_res_name(fid, &res->lr_name); @@ -238,19 +241,19 @@ int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, disk_update: /* Update the LVB from the disk inode */ - mo = mdt_object_find(&env, mdt, fid); + mo = mdt_object_find(env, mdt, fid); if (IS_ERR(mo)) GOTO(out_env, rc = PTR_ERR(mo)); - rc = mdt_dom_disk_lvbo_update(&env, mo, res, !!increase_only); - mdt_object_put(&env, mo); + rc = mdt_dom_disk_lvbo_update(env, mo, res, !!increase_only); + mdt_object_put(env, mo); out_env: - lu_env_fini(&env); return rc; } -static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, - struct ptlrpc_request *req, int increase_only) +static int mdt_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, + struct ldlm_lock *lock, struct ptlrpc_request *req, + int increase_only) { ENTRY; @@ -272,7 +275,8 @@ static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, * by MDT for DOM objects only. */ if (lock == NULL || ldlm_has_dom(lock)) - return mdt_dom_lvbo_update(res, lock, req, !!increase_only); + return mdt_dom_lvbo_update(env, res, lock, req, + !!increase_only); return 0; } @@ -302,9 +306,9 @@ static int mdt_lvbo_size(struct ldlm_lock *lock) return 0; } -static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) +static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock, + void *lvb, int lvblen) { - struct lu_env env; struct mdt_thread_info *info; struct mdt_device *mdt; struct lu_fid *fid; @@ -324,6 +328,23 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) RETURN(rc); } + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) { + rc = lu_env_refill_by_tags((struct lu_env *)env, + LCT_MD_THREAD, 0); + if (rc) + GOTO(out, rc); + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + if (!info) + GOTO(out, rc = -ENOMEM); + } + if (!info->mti_env) + info->mti_env = env; + if (!info->mti_exp) + info->mti_exp = lock->l_export; + if (!info->mti_mdt) + info->mti_mdt = mdt; + /* LVB for DoM lock is needed only for glimpse, * don't fill DoM data if there is layout lock */ if (ldlm_has_dom(lock)) { @@ -331,7 +352,8 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) int lvb_len = sizeof(struct ost_lvb); if (!mdt_dom_lvb_is_valid(res)) - mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0); + mdt_dom_lvbo_update(env, lock->l_resource, + lock, NULL, 0); if (lvb_len > lvblen) lvb_len = lvblen; @@ -340,47 +362,30 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) memcpy(lvb, res->lr_lvb_data, lvb_len); unlock_res(res); - RETURN(lvb_len); + GOTO(out, rc = lvb_len); } /* Only fill layout if layout lock is granted */ if (!ldlm_has_layout(lock) || lock->l_granted_mode != lock->l_req_mode) - RETURN(0); - - /* XXX create an env to talk to mdt stack. We should get this env from - * ptlrpc_thread->t_env. */ - rc = lu_env_init(&env, LCT_MD_THREAD); - /* Likely ENOMEM */ - if (rc) - RETURN(rc); - - info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - /* Likely ENOMEM */ - if (info == NULL) - GOTO(out, rc = -ENOMEM); - - memset(info, 0, sizeof *info); - info->mti_env = &env; - info->mti_exp = lock->l_export; - info->mti_mdt = mdt; + GOTO(out, rc = 0); /* XXX get fid by resource id. why don't include fid in ldlm_resource */ fid = &info->mti_tmp_fid2; fid_extract_from_res_name(fid, &lock->l_resource->lr_name); - obj = mdt_object_find(&env, info->mti_mdt, fid); + obj = mdt_object_find(env, info->mti_mdt, fid); if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); if (!mdt_object_exists(obj) || mdt_object_remote(obj)) - GOTO(out, rc = -ENOENT); + GOTO(out_put, rc = -ENOENT); child = mdt_object_child(obj); /* get the length of lsm */ - rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV); + rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV); if (rc < 0) - GOTO(out, rc); + GOTO(out_put, rc); if (rc > 0) { struct lu_buf *lmm = NULL; if (lvblen < rc) { @@ -400,20 +405,20 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) "%d (max_mdsize %d): rc = %d\n", mdt_obd_name(mdt), lvblen, rc, info->mti_mdt->mdt_max_mdsize, -ERANGE); - GOTO(out, rc = -ERANGE); + GOTO(out_put, rc = -ERANGE); } lmm = &info->mti_buf; lmm->lb_buf = lvb; lmm->lb_len = rc; - rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV); + rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV); if (rc < 0) - GOTO(out, rc); + GOTO(out_put, rc); } -out: +out_put: if (obj != NULL && !IS_ERR(obj)) - mdt_object_put(&env, obj); - lu_env_fini(&env); + mdt_object_put(env, obj); +out: RETURN(rc < 0 ? 0 : rc); } diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 895fb90..e76c5c3 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -1243,8 +1243,9 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res, dlmflags, &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, + policy, res, dlmflags, + &info->mti_exp->exp_handle.h_cookie); return rc; } @@ -1284,12 +1285,13 @@ static int mdt_rename_lock(struct mdt_thread_info *info, memset(policy, 0, sizeof *policy); policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, - LCK_EX, &flags, ldlm_blocking_ast, - ldlm_completion_ast, NULL, NULL, 0, - LVB_T_NONE, - &info->mti_exp->exp_handle.h_cookie, - lh); + rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id, + LDLM_IBITS, policy, LCK_EX, &flags, + ldlm_blocking_ast, + ldlm_completion_ast, NULL, NULL, 0, + LVB_T_NONE, + &info->mti_exp->exp_handle.h_cookie, + lh); RETURN(rc); } RETURN(rc); @@ -1355,8 +1357,9 @@ static int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info, fid_build_reg_res_name(mdt_object_fid(obj), res); memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP; - rc = mdt_fid_lock(info->mti_mdt->mdt_namespace, &lh->mlh_reg_lh, - LCK_EX, policy, res, dlmflags, NULL); + rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace, + &lh->mlh_reg_lh, LCK_EX, policy, res, + dlmflags, NULL); } if (rc != ELDLM_OK) diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index 21587d2..af08318 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -270,7 +270,7 @@ void mgs_revoke_lock(struct mgs_device *mgs, struct fs_db *fsdb, int type) if (!rc) { LASSERT(cp != NULL); - rc = ldlm_cli_enqueue_local(mgs->mgs_obd->obd_namespace, + rc = ldlm_cli_enqueue_local(NULL, mgs->mgs_obd->obd_namespace, &res_id, LDLM_PLAIN, NULL, LCK_EX, &flags, ldlm_blocking_ast, cp, NULL, fsdb, 0, LVB_T_NONE, NULL, diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index 4509fe7..bdbbe76 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -958,7 +958,8 @@ int ofd_fiemap_get(const struct lu_env *env, struct ofd_device *ofd, } -static int ofd_lock_unlock_region(struct ldlm_namespace *ns, +static int ofd_lock_unlock_region(const struct lu_env *env, + struct ldlm_namespace *ns, struct ldlm_res_id *res_id, unsigned long long begin, unsigned long long end) @@ -969,7 +970,7 @@ static int ofd_lock_unlock_region(struct ldlm_namespace *ns, LASSERT(begin <= end); - rc = tgt_extent_lock(ns, res_id, begin, end, &lh, LCK_PR, &flags); + rc = tgt_extent_lock(env, ns, res_id, begin, end, &lh, LCK_PR, &flags); if (rc != 0) return rc; @@ -997,7 +998,8 @@ static int ofd_lock_unlock_region(struct ldlm_namespace *ns, * \retval 0 if successful * \retval negative value on error */ -static int lock_zero_regions(struct ldlm_namespace *ns, +static int lock_zero_regions(const struct lu_env *env, + struct ldlm_namespace *ns, struct ldlm_res_id *res_id, struct fiemap *fiemap) { @@ -1013,7 +1015,7 @@ static int lock_zero_regions(struct ldlm_namespace *ns, if (fiemap_start[i].fe_logical > begin) { CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n", begin, fiemap_start[i].fe_logical); - rc = ofd_lock_unlock_region(ns, res_id, begin, + rc = ofd_lock_unlock_region(env, ns, res_id, begin, fiemap_start[i].fe_logical); if (rc) RETURN(rc); @@ -1025,7 +1027,7 @@ static int lock_zero_regions(struct ldlm_namespace *ns, if (begin < (fiemap->fm_start + fiemap->fm_length)) { CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n", begin, fiemap->fm_start + fiemap->fm_length); - rc = ofd_lock_unlock_region(ns, res_id, begin, + rc = ofd_lock_unlock_region(env, ns, res_id, begin, fiemap->fm_start + fiemap->fm_length); } @@ -1126,7 +1128,7 @@ static int ofd_get_info_hdl(struct tgt_session_info *tsi) if (fm_key->lfik_oa.o_valid & OBD_MD_FLFLAGS && fm_key->lfik_oa.o_flags & OBD_FL_SRVLOCK) { ost_fid_build_resid(fid, &fti->fti_resid); - rc = lock_zero_regions(ofd->ofd_namespace, + rc = lock_zero_regions(tsi->tsi_env, ofd->ofd_namespace, &fti->fti_resid, fiemap); if (rc == 0) rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid, @@ -1218,7 +1220,8 @@ static int ofd_getattr_hdl(struct tgt_session_info *tsi) if (unlikely(tsi->tsi_ost_body->oa.o_flags & OBD_FL_FLUSH)) lock_mode = LCK_PW; - rc = tgt_extent_lock(tsi->tsi_tgt->lut_obd->obd_namespace, + rc = tgt_extent_lock(tsi->tsi_env, + tsi->tsi_tgt->lut_obd->obd_namespace, &tsi->tsi_resid, 0, OBD_OBJECT_EOF, &lh, lock_mode, &flags); if (rc != 0) @@ -1350,7 +1353,7 @@ out: res = ldlm_resource_get(ofd->ofd_namespace, NULL, &tsi->tsi_resid, LDLM_EXTENT, 0); if (!IS_ERR(res)) { - ldlm_res_lvbo_update(res, NULL, 0); + ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0); ldlm_resource_putref(res); } } @@ -1975,8 +1978,8 @@ static int ofd_punch_hdl(struct tgt_session_info *tsi) oa->o_flags & OBD_FL_SRVLOCK; if (srvlock) { - rc = tgt_extent_lock(ns, &tsi->tsi_resid, start, end, &lh, - LCK_PW, &flags); + rc = tgt_extent_lock(tsi->tsi_env, ns, &tsi->tsi_resid, start, + end, &lh, LCK_PW, &flags); if (rc != 0) RETURN(rc); } @@ -2019,7 +2022,7 @@ out: if (!IS_ERR(res)) { struct ost_lvb *res_lvb; - ldlm_res_lvbo_update(res, NULL, 0); + ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0); res_lvb = res->lr_lvb_data; repbody->oa.o_valid |= OBD_MD_FLBLOCKS; repbody->oa.o_blocks = res_lvb->lvb_blocks; @@ -2186,7 +2189,7 @@ static int ofd_ladvise_hdl(struct tgt_session_info *tsi) ioo.ioo_oid = body->oa.o_oi; ioo.ioo_bufcnt = 1; - rc = tgt_extent_lock(exp->exp_obd->obd_namespace, + rc = tgt_extent_lock(env, exp->exp_obd->obd_namespace, &tsi->tsi_resid, start, end - 1, &lockh, LCK_PR, &flags); if (rc != 0) diff --git a/lustre/ofd/ofd_dlm.c b/lustre/ofd/ofd_dlm.c index c7fc515..8e3fe93 100644 --- a/lustre/ofd/ofd_dlm.c +++ b/lustre/ofd/ofd_dlm.c @@ -212,9 +212,9 @@ out: * \retval ELDLM_LOCK_ABORTED in other cases except error * \retval negative errno on error */ -int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, - void *req_cookie, enum ldlm_mode mode, __u64 flags, - void *data) +int ofd_intent_policy(const struct lu_env *env, struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + enum ldlm_mode mode, __u64 flags, void *data) { struct ptlrpc_request *req = req_cookie; struct ldlm_lock *lock = *lockp; diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index df5b714..a76cb69 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -426,9 +426,9 @@ extern struct ldlm_valblock_ops ofd_lvbo; /* ofd_dlm.c */ extern struct kmem_cache *ldlm_glimpse_work_kmem; -int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, - void *req_cookie, enum ldlm_mode mode, __u64 flags, - void *data); +int ofd_intent_policy(const struct lu_env *env, struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + enum ldlm_mode mode, __u64 flags, void *data); static inline struct ofd_thread_info *ofd_info(const struct lu_env *env) { diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index 8e038ce..f61d24a 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -1360,7 +1360,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, rs = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); if (!IS_ERR(rs)) { - ldlm_res_lvbo_update(rs, NULL, 1); + ldlm_res_lvbo_update(env, rs, NULL, 1); ldlm_resource_putref(rs); } } diff --git a/lustre/ofd/ofd_lvb.c b/lustre/ofd/ofd_lvb.c index 81f1e67..aecd1b6 100644 --- a/lustre/ofd/ofd_lvb.c +++ b/lustre/ofd/ofd_lvb.c @@ -82,17 +82,18 @@ static int ofd_lvbo_free(struct ldlm_resource *res) * \retval 0 on successful setup * \retval negative value on error */ -static int ofd_lvbo_init(struct ldlm_resource *res) +static int ofd_lvbo_init(const struct lu_env *env, struct ldlm_resource *res) { struct ost_lvb *lvb; struct ofd_device *ofd; struct ofd_object *fo; struct ofd_thread_info *info; - struct lu_env env; int rc = 0; ENTRY; + LASSERT(env); + info = ofd_info(env); LASSERT(res); LASSERT(mutex_is_locked(&res->lr_lvb_mutex)); @@ -105,25 +106,20 @@ static int ofd_lvbo_init(struct ldlm_resource *res) if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB)) RETURN(-ENOMEM); - rc = lu_env_init(&env, LCT_DT_THREAD); - if (rc) - RETURN(rc); - OBD_ALLOC_PTR(lvb); if (lvb == NULL) - GOTO(out_env, rc = -ENOMEM); + GOTO(out, rc = -ENOMEM); res->lr_lvb_data = lvb; res->lr_lvb_len = sizeof(*lvb); - info = ofd_info_init(&env, NULL); ost_fid_from_resid(&info->fti_fid, &res->lr_name, ofd->ofd_lut.lut_lsd.lsd_osd_index); - fo = ofd_object_find(&env, ofd, &info->fti_fid); + fo = ofd_object_find(env, ofd, &info->fti_fid); if (IS_ERR(fo)) GOTO(out_lvb, rc = PTR_ERR(fo)); - rc = ofd_attr_get(&env, fo, &info->fti_attr); + rc = ofd_attr_get(env, fo, &info->fti_attr); if (rc) GOTO(out_obj, rc); @@ -138,14 +134,15 @@ static int ofd_lvbo_init(struct ldlm_resource *res) PFID(&info->fti_fid), lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_blocks); + info->fti_attr.la_valid = 0; + EXIT; out_obj: - ofd_object_put(&env, fo); + ofd_object_put(env, fo); out_lvb: if (rc != 0) OST_LVB_SET_ERR(lvb->lvb_blocks, rc); -out_env: - lu_env_fini(&env); +out: /* Don't free lvb data on lookup error */ return rc; } @@ -179,35 +176,32 @@ out_env: * \retval 0 on successful setup * \retval negative value on error */ -static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, - struct ptlrpc_request *req, int increase_only) +static int ofd_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, + struct ldlm_lock *lock, struct ptlrpc_request *req, + int increase_only) { + struct ofd_thread_info *info; struct ofd_device *ofd; struct ofd_object *fo; - struct ofd_thread_info *info; struct ost_lvb *lvb; - struct lu_env env; int rc = 0; ENTRY; + LASSERT(env); + info = ofd_info(env); LASSERT(res != NULL); ofd = ldlm_res_to_ns(res)->ns_lvbp; LASSERT(ofd != NULL); - rc = lu_env_init(&env, LCT_DT_THREAD); - if (rc) - RETURN(rc); - - info = ofd_info_init(&env, NULL); fid_extract_from_res_name(&info->fti_fid, &res->lr_name); lvb = res->lr_lvb_data; if (lvb == NULL) { CERROR("%s: no LVB data for "DFID"\n", ofd_name(ofd), PFID(&info->fti_fid)); - GOTO(out_env, rc = 0); + GOTO(out, rc = 0); } /* Update the LVB from the network message */ @@ -279,11 +273,11 @@ disk_update: /* Update the LVB from the disk inode */ ost_fid_from_resid(&info->fti_fid, &res->lr_name, ofd->ofd_lut.lut_lsd.lsd_osd_index); - fo = ofd_object_find(&env, ofd, &info->fti_fid); + fo = ofd_object_find(env, ofd, &info->fti_fid); if (IS_ERR(fo)) - GOTO(out_env, rc = PTR_ERR(fo)); + GOTO(out, rc = PTR_ERR(fo)); - rc = ofd_attr_get(&env, fo, &info->fti_attr); + rc = ofd_attr_get(env, fo, &info->fti_attr); if (rc) GOTO(out_obj, rc); @@ -322,9 +316,8 @@ disk_update: unlock_res(res); out_obj: - ofd_object_put(&env, fo); -out_env: - lu_env_fini(&env); + ofd_object_put(env, fo); +out: return rc; } @@ -358,7 +351,8 @@ static int ofd_lvbo_size(struct ldlm_lock *lock) * * \retval size of LVB data written into \a buf buffer */ -static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen) +static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock, + void *buf, int buflen) { struct ldlm_resource *res = lock->l_resource; int lvb_len; diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index aa59c14..0d6e2f4 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -952,7 +952,7 @@ out: * to go... deadlock! */ res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); if (!IS_ERR(res)) { - ldlm_res_lvbo_update(res, NULL, 0); + ldlm_res_lvbo_update(env, res, NULL, 0); ldlm_resource_putref(res); } } @@ -994,7 +994,7 @@ int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd, /* Tell the clients that the object is gone now and that they should * throw away any cached pages. */ ost_fid_build_resid(fid, &info->fti_resid); - rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, + rc = ldlm_cli_enqueue_local(env, ofd->ofd_namespace, &info->fti_resid, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, LVB_T_NONE, NULL, &lockh); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index a167146..1ea3b9f 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -2322,11 +2322,12 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) * error or otherwise be interrupted). * Returns 0 on success or error code otherwise. */ -int ptlrpc_set_wait(struct ptlrpc_request_set *set) +int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct ptlrpc_request *req; - struct l_wait_info lwi; + struct list_head *tmp; + struct ptlrpc_request *req; + struct l_wait_info lwi; + struct lu_env _env; time64_t timeout; int rc; ENTRY; @@ -2344,6 +2345,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) if (list_empty(&set->set_requests)) RETURN(0); + /* ideally we want env provide by the caller all the time, + * but at the moment that would mean a massive change in + * LDLM while benefits would be close to zero, so just + * initialize env here for those rare cases */ + if (!env) { + /* XXX: skip on the client side? */ + rc = lu_env_init(&_env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + env = &_env; + } + do { timeout = ptlrpc_set_next_timeout(set); @@ -2371,7 +2384,8 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1), ptlrpc_expired_set, set); - rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi); + rc = l_wait_event(set->set_waitq, + ptlrpc_check_set(env, set), &lwi); /* LU-769 - if we ignored the signal because it was already * pending when we started, we need to handle it now or we risk @@ -2422,6 +2436,9 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) rc = req->rq_status; } + if (env && env == &_env) + lu_env_fini(&_env); + RETURN(rc); } EXPORT_SYMBOL(ptlrpc_set_wait); @@ -2921,13 +2938,13 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) /* for distributed debugging */ lustre_msg_set_status(req->rq_reqmsg, current_pid()); - /* add a ref for the set (see comment in ptlrpc_set_add_req) */ - ptlrpc_request_addref(req); - ptlrpc_set_add_req(set, req); - rc = ptlrpc_set_wait(set); - ptlrpc_set_destroy(set); + /* add a ref for the set (see comment in ptlrpc_set_add_req) */ + ptlrpc_request_addref(req); + ptlrpc_set_add_req(set, req); + rc = ptlrpc_set_wait(NULL, set); + ptlrpc_set_destroy(set); - RETURN(rc); + RETURN(rc); } EXPORT_SYMBOL(ptlrpc_queue_wait); diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 1840e3a..b98d082 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -503,11 +503,11 @@ static int ptlrpcd(void *arg) */ } while (exit < 2); - /* - * Wait for inflight requests to drain. - */ + /* + * Wait for inflight requests to drain. + */ if (!list_empty(&set->set_requests)) - ptlrpc_set_wait(set); + ptlrpc_set_wait(&env, set); lu_context_fini(&env.le_ctx); lu_context_fini(env.le_ses); diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index 4a8aa81..f29db18 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -140,7 +140,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, /* on success, pack lvb in reply */ lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); lvb_len = ldlm_lvbo_size(*lockp); - lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len); + lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, lvb_len); if (lvb_len < 0) GOTO(out, rc = lvb_len); diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index ed8c030..fe7f26e 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -1604,7 +1604,7 @@ int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, LASSERT(ns != NULL); LASSERT(!lustre_handle_is_used(lh)); - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy, mode, + rc = ldlm_cli_enqueue_local(NULL, ns, res_id, LDLM_IBITS, &policy, mode, flags, ldlm_blocking_ast, ldlm_completion_ast, ldlm_glimpse_ast, NULL, 0, LVB_T_NONE, NULL, lh); @@ -1624,9 +1624,9 @@ EXPORT_SYMBOL(tgt_mdt_data_unlock); * Helper function for getting server side [start, start+count] DLM lock * if asked by client. */ -int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, - __u64 start, __u64 end, struct lustre_handle *lh, - int mode, __u64 *flags) +int tgt_extent_lock(const struct lu_env *env, struct ldlm_namespace *ns, + struct ldlm_res_id *res_id, __u64 start, __u64 end, + struct lustre_handle *lh, int mode, __u64 *flags) { union ldlm_policy_data policy; int rc; @@ -1649,8 +1649,8 @@ int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, else policy.l_extent.end = end | ~PAGE_MASK; - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_EXTENT, &policy, mode, - flags, ldlm_blocking_ast, + rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_EXTENT, &policy, + mode, flags, ldlm_blocking_ast, ldlm_completion_ast, ldlm_glimpse_ast, NULL, 0, LVB_T_NONE, NULL, lh); RETURN(rc == ELDLM_OK ? 0 : -EIO); @@ -1664,9 +1664,10 @@ void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode) } EXPORT_SYMBOL(tgt_extent_unlock); -static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id, - struct obd_ioobj *obj, struct niobuf_remote *nb, - struct lustre_handle *lh, enum ldlm_mode mode) +static int tgt_brw_lock(const struct lu_env *env, struct obd_export *exp, + struct ldlm_res_id *res_id, struct obd_ioobj *obj, + struct niobuf_remote *nb, struct lustre_handle *lh, + enum ldlm_mode mode) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; __u64 flags = 0; @@ -1693,7 +1694,7 @@ static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id, if (exp->exp_connect_data.ocd_connect_flags & OBD_CONNECT_IBITS) rc = tgt_mdt_data_lock(ns, res_id, lh, mode, &flags); else - rc = tgt_extent_lock(ns, res_id, nb[0].rnb_offset, + rc = tgt_extent_lock(env, ns, res_id, nb[0].rnb_offset, nb[nrbufs - 1].rnb_offset + nb[nrbufs - 1].rnb_len - 1, lh, mode, &flags); @@ -2158,8 +2159,8 @@ int tgt_brw_read(struct tgt_session_info *tsi) local_nb = tbc->local; - rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh, - LCK_PR); + rc = tgt_brw_lock(tsi->tsi_env, exp, &tsi->tsi_resid, ioo, remote_nb, + &lockh, LCK_PR); if (rc != 0) RETURN(rc); @@ -2496,8 +2497,8 @@ int tgt_brw_write(struct tgt_session_info *tsi) local_nb = tbc->local; - rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh, - LCK_PW); + rc = tgt_brw_lock(tsi->tsi_env, exp, &tsi->tsi_resid, ioo, remote_nb, + &lockh, LCK_PW); if (rc != 0) GOTO(out, rc); -- 1.8.3.1