Whamcloud - gitweb
LU-11164 ldlm: pass env to lvbo methods 32/32832/17
authorAlex Zhuravlev <bzzz@whamcloud.com>
Thu, 19 Jul 2018 12:02:43 +0000 (16:02 +0400)
committerOleg Drokin <green@whamcloud.com>
Fri, 5 Oct 2018 22:26:48 +0000 (22:26 +0000)
to save on env allocation.

Benchmarks made by Shuichi Ihara demonstrated 13% improvement
for small I/Os: 564k vs 639k IOPS. the details can be found
LU-11164.

Change-Id: I797e3d7e19ef408993004a2b872842d655240525
Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/32832
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Patrick Farrell <paf@cray.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
30 files changed:
lustre/include/lu_target.h
lustre/include/lustre_dlm.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/lfsck/lfsck_engine.c
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/lod/lod_dev.c
lustre/lod/lod_object.c
lustre/lov/lov_obd.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_io.c
lustre/mdt/mdt_lvb.c
lustre/mdt/mdt_reint.c
lustre/mgs/mgs_handler.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_dlm.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_io.c
lustre/ofd/ofd_lvb.c
lustre/ofd/ofd_obd.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpcd.c
lustre/quota/qmt_lock.c
lustre/target/tgt_handler.c

index 5b6c1ae..c698a8f 100644 (file)
@@ -430,9 +430,9 @@ void tgt_io_thread_done(struct ptlrpc_thread *thread);
 int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
                      struct lustre_handle *lh, int mode, __u64 *flags);
 void tgt_mdt_data_unlock(struct lustre_handle *lh, enum ldlm_mode mode);
-int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
-                   __u64 start, __u64 end, struct lustre_handle *lh,
-                   int mode, __u64 *flags);
+int tgt_extent_lock(const struct lu_env *env, struct ldlm_namespace *ns,
+                   struct ldlm_res_id *res_id, __u64 start, __u64 end,
+                   struct lustre_handle *lh, int mode, __u64 *flags);
 void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode);
 int tgt_brw_read(struct tgt_session_info *tsi);
 int tgt_brw_write(struct tgt_session_info *tsi);
index 90ffb3e..5262a1d 100644 (file)
@@ -272,9 +272,10 @@ struct ldlm_pool {
        struct completion        pl_kobj_unregister;
 };
 
-typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
-                              void *req_cookie, enum ldlm_mode mode,
-                              __u64 flags, void *data);
+typedef int (*ldlm_res_policy)(const struct lu_env *env,
+                              struct ldlm_namespace *,
+                              struct ldlm_lock **, void *req_cookie,
+                              enum ldlm_mode mode, __u64 flags, void *data);
 
 typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock);
 
@@ -292,14 +293,16 @@ typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock);
  * of ldlm_[res_]lvbo_[init,update,fill]() functions.
  */
 struct ldlm_valblock_ops {
-       int (*lvbo_init)(struct ldlm_resource *res);
-       int (*lvbo_update)(struct ldlm_resource *res, struct ldlm_lock *lock,
-                          struct ptlrpc_request *r,  int increase);
+       int (*lvbo_init)(const struct lu_env *env, struct ldlm_resource *res);
+       int (*lvbo_update)(const struct lu_env *env, struct ldlm_resource *res,
+                          struct ldlm_lock *lock, struct ptlrpc_request *r,
+                          int increase);
        int (*lvbo_free)(struct ldlm_resource *res);
        /* Return size of lvb data appropriate RPC size can be reserved */
        int (*lvbo_size)(struct ldlm_lock *lock);
        /* Called to fill in lvb data to RPC buffer @buf */
-       int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int buflen);
+       int (*lvbo_fill)(const struct lu_env *env, struct ldlm_lock *lock,
+                        void *buf, int buflen);
 };
 
 /**
@@ -1076,7 +1079,8 @@ ldlm_lock_to_ns_at(struct ldlm_lock *lock)
         return &lock->l_resource->lr_ns_bucket->nsb_at_estimate;
 }
 
-static inline int ldlm_lvbo_init(struct ldlm_resource *res)
+static inline int ldlm_lvbo_init(const struct lu_env *env,
+                                struct ldlm_resource *res)
 {
        struct ldlm_namespace *ns = ldlm_res_to_ns(res);
        int rc = 0;
@@ -1091,7 +1095,7 @@ static inline int ldlm_lvbo_init(struct ldlm_resource *res)
                mutex_unlock(&res->lr_lvb_mutex);
                return 0;
        }
-       rc = ns->ns_lvbo->lvbo_init(res);
+       rc = ns->ns_lvbo->lvbo_init(env, res);
        if (rc < 0) {
                CDEBUG(D_DLMTRACE, "lvbo_init failed for resource : rc = %d\n",
                       rc);
@@ -1117,7 +1121,8 @@ static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
        return 0;
 }
 
-static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
+static inline int ldlm_lvbo_fill(const struct lu_env *env,
+                                struct ldlm_lock *lock, void *buf, int len)
 {
        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
        int rc;
@@ -1125,13 +1130,13 @@ static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
        if (ns->ns_lvbo != NULL) {
                LASSERT(ns->ns_lvbo->lvbo_fill != NULL);
                /* init lvb now if not already */
-               rc = ldlm_lvbo_init(lock->l_resource);
+               rc = ldlm_lvbo_init(env, lock->l_resource);
                if (rc < 0) {
                        CERROR("lock %p: delayed lvb init failed (rc %d)",
                               lock, rc);
                        return rc;
                }
-               return ns->ns_lvbo->lvbo_fill(lock, buf, len);
+               return ns->ns_lvbo->lvbo_fill(env, lock, buf, len);
        }
        return 0;
 }
@@ -1392,7 +1397,8 @@ ldlm_handle2lock_long(const struct lustre_handle *h, __u64 flags)
  * Update Lock Value Block Operations (LVBO) on a resource taking into account
  * data from request \a r
  */
-static inline int ldlm_lvbo_update(struct ldlm_resource *res,
+static inline int ldlm_lvbo_update(const struct lu_env *env,
+                                  struct ldlm_resource *res,
                                   struct ldlm_lock *lock,
                                   struct ptlrpc_request *req, int increase)
 {
@@ -1400,22 +1406,24 @@ static inline int ldlm_lvbo_update(struct ldlm_resource *res,
        int rc;
 
        /* delayed lvb init may be required */
-       rc = ldlm_lvbo_init(res);
+       rc = ldlm_lvbo_init(env, res);
        if (rc < 0) {
                CERROR("delayed lvb init failed (rc %d)\n", rc);
                return rc;
        }
 
        if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
-               return ns->ns_lvbo->lvbo_update(res, lock, req, increase);
+               return ns->ns_lvbo->lvbo_update(env, res, lock, req, increase);
 
        return 0;
 }
 
-static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
-                                      struct ptlrpc_request *req, int increase)
+static inline int ldlm_res_lvbo_update(const struct lu_env *env,
+                                      struct ldlm_resource *res,
+                                      struct ptlrpc_request *req,
+                                      int increase)
 {
-       return ldlm_lvbo_update(res, NULL, req, increase);
+       return ldlm_lvbo_update(env, res, NULL, req, increase);
 }
 
 int ldlm_error2errno(enum ldlm_error error);
@@ -1613,7 +1621,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                          enum ldlm_mode mode, __u64 *flags, void *lvb,
                          __u32 lvb_len,
                          const struct lustre_handle *lockh, int rc);
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+int ldlm_cli_enqueue_local(const struct lu_env *env,
+                          struct ldlm_namespace *ns,
                           const struct ldlm_res_id *res_id,
                           enum ldlm_type type, union ldlm_policy_data *policy,
                           enum ldlm_mode mode, __u64 *flags,
index 45c6d48..3274334 100644 (file)
@@ -2130,7 +2130,7 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void);
 struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
                                             void *arg);
 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
-int ptlrpc_set_wait(struct ptlrpc_request_set *);
+int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *);
 void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
 void ptlrpc_set_destroy(struct ptlrpc_request_set *);
 void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
index 572b686..e2454e5 100644 (file)
@@ -145,7 +145,9 @@ ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *,
                 enum ldlm_type type, enum ldlm_mode mode,
                 const struct ldlm_callback_suite *cbs,
                 void *data, __u32 lvb_len, enum lvb_type lvb_type);
-enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
+enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
+                                 struct ldlm_namespace *,
+                                 struct ldlm_lock **,
                                  void *cookie, __u64 *flags);
 void ldlm_lock_addref_internal(struct ldlm_lock *, enum ldlm_mode mode);
 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, enum ldlm_mode mode);
@@ -162,7 +164,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
 void ldlm_discard_bl_list(struct list_head *bl_list);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
-                      ldlm_desc_ast_t ast_type);
+                     ldlm_desc_ast_t ast_type);
 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq);
 int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, ktime_t last_use);
 #define ldlm_lock_remove_from_lru(lock) \
index 44680dd..2b51f93 100644 (file)
@@ -1727,7 +1727,8 @@ restart:
  * set, skip all the enqueueing and delegate lock processing to intent policy
  * function.
  */
-enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
+enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
+                                 struct ldlm_namespace *ns,
                                  struct ldlm_lock **lockp,
                                  void *cookie, __u64 *flags)
 {
@@ -1741,8 +1742,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
         /* policies are not executed on the client or during replay */
         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
             && !local && ns->ns_policy) {
-                rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
-                                   NULL);
+               rc = ns->ns_policy(env, ns, lockp, cookie, lock->l_req_mode,
+                                  *flags, NULL);
                 if (rc == ELDLM_LOCK_REPLACED) {
                         /* The lock that was returned has already been granted,
                          * and placed into lockp.  If it's not the same as the
@@ -2199,7 +2200,7 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
  * one.
  */
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
-                      ldlm_desc_ast_t ast_type)
+                     ldlm_desc_ast_t ast_type)
 {
        struct ldlm_cb_set_arg *arg;
        set_producer_func       work_ast_lock;
@@ -2245,7 +2246,7 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
        if (arg->set == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       ptlrpc_set_wait(arg->set);
+       ptlrpc_set_wait(NULL, arg->set);
        ptlrpc_set_destroy(arg->set);
 
        rc = atomic_read(&arg->restart) ? -ERESTART : 0;
@@ -2455,6 +2456,7 @@ int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data)
 EXPORT_SYMBOL(ldlm_lock_set_data);
 
 struct export_cl_data {
+       const struct lu_env     *ecl_env;
        struct obd_export       *ecl_exp;
        int                     ecl_loop;
 };
@@ -2467,7 +2469,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp,
 
        res = ldlm_resource_getref(lock->l_resource);
 
-       ldlm_lvbo_update(res, lock, NULL, 1);
+       ldlm_lvbo_update(ecl->ecl_env, res, lock, NULL, 1);
        ldlm_lock_cancel(lock);
        if (!exp->exp_obd->obd_stopping)
                ldlm_reprocess_all(res);
@@ -2507,10 +2509,17 @@ ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
  */
 int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
 {
+       struct lu_env env;
        struct export_cl_data   ecl = {
                .ecl_exp        = exp,
                .ecl_loop       = 0,
        };
+       int rc;
+
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       if (rc)
+               RETURN(rc);
+       ecl.ecl_env = &env;
 
        while (!list_empty(&exp->exp_bl_list)) {
                struct ldlm_lock *lock;
@@ -2533,6 +2542,8 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
                LDLM_LOCK_RELEASE(lock);
        }
 
+       lu_env_fini(&env);
+
        CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
               "left on hash table %d.\n", exp, ecl.ecl_loop,
               atomic_read(&exp->exp_lock_hash->hs_count));
@@ -2547,10 +2558,16 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
  */
 int ldlm_export_cancel_locks(struct obd_export *exp)
 {
-       struct export_cl_data   ecl = {
-               .ecl_exp        = exp,
-               .ecl_loop       = 0,
-       };
+       struct export_cl_data ecl;
+       struct lu_env env;
+       int rc;
+
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       if (rc)
+               RETURN(rc);
+       ecl.ecl_env = &env;
+       ecl.ecl_exp = exp;
+       ecl.ecl_loop = 0;
 
        cfs_hash_for_each_empty(exp->exp_lock_hash,
                                ldlm_cancel_locks_for_export_cb, &ecl);
@@ -2564,6 +2581,8 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
            exp->exp_obd->obd_stopping)
                ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
 
+       lu_env_fini(&env);
+
        return ecl.ecl_loop;
 }
 
index c0b4edc..21a3c96 100644 (file)
@@ -633,7 +633,8 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
 /**
  * Perform lock cleanup if AST reply came with error.
  */
-static int ldlm_handle_ast_error(struct ldlm_lock *lock,
+static int ldlm_handle_ast_error(const struct lu_env *env,
+                                struct ldlm_lock *lock,
                                 struct ptlrpc_request *req, int rc,
                                 const char *ast_type)
 {
@@ -692,7 +693,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
                        /* update lvbo to return proper attributes.
                         * see bug 23174 */
                        ldlm_resource_getref(res);
-                       ldlm_lvbo_update(res, lock, NULL, 1);
+                       ldlm_lvbo_update(env, res, lock, NULL, 1);
                        ldlm_resource_putref(res);
                }
                ldlm_lock_cancel(lock);
@@ -727,20 +728,24 @@ static int ldlm_cb_interpret(const struct lu_env *env,
                } else if (rc == -ELDLM_NO_LOCK_DATA) {
                        LDLM_DEBUG(lock, "lost race - client has a lock but no "
                                   "inode");
-                       ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
+                       ldlm_lvbo_update(env, lock->l_resource, lock, NULL, 1);
                } else if (rc != 0) {
-                       rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+                       rc = ldlm_handle_ast_error(env, lock, req,
+                                                  rc, "glimpse");
                } else {
-                       rc = ldlm_lvbo_update(lock->l_resource, lock, req, 1);
+                       rc = ldlm_lvbo_update(env, lock->l_resource,
+                                             lock, req, 1);
                }
                break;
        case LDLM_BL_CALLBACK:
                if (rc != 0)
-                       rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+                       rc = ldlm_handle_ast_error(env, lock, req,
+                                                  rc, "blocking");
                break;
        case LDLM_CP_CALLBACK:
                if (rc != 0)
-                       rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+                       rc = ldlm_handle_ast_error(env, lock, req,
+                                                  rc, "completion");
                break;
        default:
                LDLM_ERROR(lock, "invalid opcode for lock callback %d",
@@ -988,8 +993,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
         ldlm_lock2desc(lock, &body->lock_desc);
        if (lvb_len > 0) {
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+               const struct lu_env *env = NULL;
 
-               lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
+               if (req->rq_svc_thread)
+                       env = req->rq_svc_thread->t_env;
+
+               lvb_len = ldlm_lvbo_fill(env, lock, lvb, lvb_len);
                if (lvb_len < 0) {
                        /* We still need to send the RPC to wake up the blocked
                         * enqueue thread on the client.
@@ -1174,6 +1183,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
        void *cookie = NULL;
        int rc = 0;
        struct ldlm_resource *res = NULL;
+       const struct lu_env *env = req->rq_svc_thread->t_env;
        ENTRY;
 
        LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
@@ -1254,7 +1264,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
        res = lock->l_resource;
        if (!(flags & LDLM_FL_REPLAY)) {
                /* non-replayed lock, delayed lvb init may need to be done */
-               rc = ldlm_lvbo_init(res);
+               rc = ldlm_lvbo_init(env, res);
                if (rc < 0) {
                        LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc);
                        GOTO(out, rc);
@@ -1302,25 +1312,25 @@ existing_lock:
                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
                                     RCL_SERVER, ldlm_lvbo_size(lock));
 
-                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
-                        GOTO(out, rc = -ENOMEM);
+               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
+                       GOTO(out, rc = -ENOMEM);
 
-                rc = req_capsule_server_pack(&req->rq_pill);
-                if (rc)
-                        GOTO(out, rc);
-        }
+               rc = req_capsule_server_pack(&req->rq_pill);
+               if (rc)
+                       GOTO(out, rc);
+       }
 
-       err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
+       err = ldlm_lock_enqueue(env, ns, &lock, cookie, &flags);
        if (err) {
                if ((int)err < 0)
                        rc = (int)err;
                GOTO(out, err);
        }
 
-        dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
 
-        ldlm_lock2desc(lock, &dlm_rep->lock_desc);
-        ldlm_lock2handle(lock, &dlm_rep->lock_handle);
+       ldlm_lock2desc(lock, &dlm_rep->lock_desc);
+       ldlm_lock2handle(lock, &dlm_rep->lock_handle);
 
        if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
@@ -1410,7 +1420,7 @@ existing_lock:
                                /* non-replayed lock, delayed lvb init may
                                 * need to be occur now */
                                if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
-                                       buflen = ldlm_lvbo_fill(lock, buf,
+                                       buflen = ldlm_lvbo_fill(env, lock, buf,
                                                                buflen);
                                        if (buflen >= 0)
                                                req_capsule_shrink(
@@ -1550,6 +1560,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                        const struct ldlm_request *dlm_req,
                        int first, enum lustre_at_flags flags)
 {
+       const struct lu_env *env = req->rq_svc_thread->t_env;
         struct ldlm_resource *res, *pres = NULL;
         struct ldlm_lock *lock;
         int i, count, done = 0;
@@ -1591,15 +1602,16 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                                 LDLM_RESOURCE_DELREF(pres);
                                 ldlm_resource_putref(pres);
                         }
-                        if (res != NULL) {
-                                ldlm_resource_getref(res);
-                                LDLM_RESOURCE_ADDREF(res);
+                       if (res != NULL) {
+                               ldlm_resource_getref(res);
+                               LDLM_RESOURCE_ADDREF(res);
 
                                if (!ldlm_is_discard_data(lock))
-                                       ldlm_lvbo_update(res, lock, NULL, 1);
-                        }
-                        pres = res;
-                }
+                                       ldlm_lvbo_update(env, res, lock,
+                                                        NULL, 1);
+                       }
+                       pres = res;
+               }
 
                if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock) &&
                    lock->l_blast_sent != 0) {
index fcb2b4a..78e2547 100644 (file)
@@ -431,7 +431,8 @@ int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
 /**
  * Enqueue a local lock (typically on a server).
  */
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+int ldlm_cli_enqueue_local(const struct lu_env *env,
+                          struct ldlm_namespace *ns,
                           const struct ldlm_res_id *res_id,
                           enum ldlm_type type, union ldlm_policy_data *policy,
                           enum ldlm_mode mode, __u64 *flags,
@@ -461,7 +462,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
        if (IS_ERR(lock))
                GOTO(out_nolock, err = PTR_ERR(lock));
 
-       err = ldlm_lvbo_init(lock->l_resource);
+       err = ldlm_lvbo_init(env, lock->l_resource);
        if (err < 0) {
                LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
                ldlm_lock_destroy_nolock(lock);
@@ -489,15 +490,15 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                lock->l_req_extent = policy->l_extent;
        }
 
-        err = ldlm_lock_enqueue(ns, &lock, policy, flags);
-        if (unlikely(err != ELDLM_OK))
-                GOTO(out, err);
+       err = ldlm_lock_enqueue(env, ns, &lock, policy, flags);
+       if (unlikely(err != ELDLM_OK))
+               GOTO(out, err);
 
-        if (policy != NULL)
-                *policy = lock->l_policy_data;
+       if (policy != NULL)
+               *policy = lock->l_policy_data;
 
-        if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, *flags, NULL);
+       if (lock->l_completion_ast)
+               lock->l_completion_ast(lock, *flags, NULL);
 
         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
         EXIT;
@@ -564,12 +565,16 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                          __u32 lvb_len, const struct lustre_handle *lockh,
                          int rc)
 {
-        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        int is_replay = *flags & LDLM_FL_REPLAY;
-        struct ldlm_lock *lock;
-        struct ldlm_reply *reply;
-        int cleanup_phase = 1;
-        ENTRY;
+       struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+       const struct lu_env *env = NULL;
+       int is_replay = *flags & LDLM_FL_REPLAY;
+       struct ldlm_lock *lock;
+       struct ldlm_reply *reply;
+       int cleanup_phase = 1;
+       ENTRY;
+
+       if (req && req->rq_svc_thread)
+               env = req->rq_svc_thread->t_env;
 
         lock = ldlm_handle2lock(lockh);
         /* ldlm_cli_enqueue is holding a reference on this lock. */
@@ -707,16 +712,16 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                }
         }
 
-        if (!is_replay) {
-                rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
-                if (lock->l_completion_ast != NULL) {
-                        int err = lock->l_completion_ast(lock, *flags, NULL);
-                        if (!rc)
-                                rc = err;
-                        if (rc)
-                                cleanup_phase = 1;
-                }
-        }
+       if (!is_replay) {
+               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+               if (lock->l_completion_ast != NULL) {
+                       int err = lock->l_completion_ast(lock, *flags, NULL);
+                       if (!rc)
+                               rc = err;
+                       if (rc)
+                               cleanup_phase = 1;
+               }
+       }
 
        if (lvb_len > 0 && lvb != NULL) {
                /* Copy the LVB here, and not earlier, because the completion
index f49fda2..20866fc 100644 (file)
@@ -1235,7 +1235,7 @@ again:
        }
        spin_unlock(&ltds->ltd_lock);
 
-       rc = ptlrpc_set_wait(set);
+       rc = ptlrpc_set_wait(env, set);
        if (rc < 0) {
                ptlrpc_set_destroy(set);
                RETURN(rc);
@@ -1330,7 +1330,7 @@ static int lfsck_assistant_notify_others(const struct lu_env *env,
                up_read(&ltds->ltd_rw_sem);
 
                /* Sync up */
-               rc = ptlrpc_set_wait(set);
+               rc = ptlrpc_set_wait(env, set);
                if (rc < 0) {
                        ptlrpc_set_destroy(set);
                        RETURN(rc);
@@ -1462,7 +1462,7 @@ again:
                }
                spin_unlock(&ltds->ltd_lock);
 
-               rc = ptlrpc_set_wait(set);
+               rc = ptlrpc_set_wait(env, set);
                if (rc < 0) {
                        ptlrpc_set_destroy(set);
                        RETURN(rc);
@@ -1536,7 +1536,7 @@ again:
                break;
        }
 
-       rc1 = ptlrpc_set_wait(set);
+       rc1 = ptlrpc_set_wait(env, set);
        ptlrpc_set_destroy(set);
 
        RETURN(rc != 0 ? rc : rc1);
index cdb46d8..e866566 100644 (file)
@@ -308,7 +308,7 @@ static void lfsck_layout_assistant_sync_failures(const struct lu_env *env,
        up_read(&ltds->ltd_rw_sem);
 
        if (rc == 0 && atomic_read(&count) > 0)
-               rc = ptlrpc_set_wait(set);
+               rc = ptlrpc_set_wait(env, set);
 
        ptlrpc_set_destroy(set);
 
@@ -2580,10 +2580,10 @@ static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
        memset(policy, 0, sizeof(*policy));
        policy->l_extent.end = OBD_OBJECT_EOF;
        ost_fid_build_resid(fid, resid);
-       rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
-                                   policy, LCK_EX, &flags, ldlm_blocking_ast,
-                                   ldlm_completion_ast, NULL, NULL, 0,
-                                   LVB_T_NONE, NULL, &lh);
+       rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
+                                   LDLM_EXTENT, policy, LCK_EX, &flags,
+                                   ldlm_blocking_ast, ldlm_completion_ast,
+                                   NULL, NULL, 0, LVB_T_NONE, NULL, &lh);
        if (rc != ELDLM_OK)
                GOTO(put, rc = -EIO);
 
@@ -4752,7 +4752,7 @@ lfsck_layout_slave_query_master(const struct lu_env *env,
        }
        spin_unlock(&llsd->llsd_lock);
 
-       rc = ptlrpc_set_wait(set);
+       rc = ptlrpc_set_wait(env, set);
        ptlrpc_set_destroy(set);
 
        GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
@@ -4830,7 +4830,7 @@ lfsck_layout_slave_notify_master(const struct lu_env *env,
        }
        spin_unlock(&llsd->llsd_lock);
 
-       ptlrpc_set_wait(set);
+       ptlrpc_set_wait(env, set);
        ptlrpc_set_destroy(set);
 
        RETURN_EXIT;
index 46ae40b..82c3168 100644 (file)
@@ -380,7 +380,7 @@ static int __lfsck_ibits_lock(const struct lu_env *env,
 
                rc = dt_object_lock(env, obj, lh, einfo, policy);
        } else {
-               rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
+               rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
                                            LDLM_IBITS, policy, mode,
                                            &flags, ldlm_blocking_ast,
                                            ldlm_completion_ast, NULL, NULL,
@@ -2347,7 +2347,7 @@ static int lfsck_stop_notify(const struct lu_env *env,
                               ltd->ltd_index, lad->lad_name, rc);
                        lfsck_tgt_put(ltd);
                } else {
-                       rc = ptlrpc_set_wait(set);
+                       rc = ptlrpc_set_wait(env, set);
                }
 
                ptlrpc_set_destroy(set);
@@ -2484,7 +2484,7 @@ again:
                goto again;
        }
 
-       rc = ptlrpc_set_wait(set);
+       rc = ptlrpc_set_wait(env, set);
        ptlrpc_set_destroy(set);
 
        RETURN(rc);
@@ -2946,7 +2946,7 @@ static int lfsck_stop_all(const struct lu_env *env,
        }
        up_read(&ltds->ltd_rw_sem);
 
-       rc = ptlrpc_set_wait(set);
+       rc = ptlrpc_set_wait(env, set);
        ptlrpc_set_destroy(set);
 
        if (rc == 0)
@@ -3037,7 +3037,7 @@ again:
                RETURN(rc);
        }
 
-       rc = ptlrpc_set_wait(set);
+       rc = ptlrpc_set_wait(env, set);
        ptlrpc_set_destroy(set);
 
        if (rc == 0)
index 7f01acb..0459052 100644 (file)
@@ -6577,7 +6577,7 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
        }
        up_read(&ltds->ltd_rw_sem);
 
-       rc = ptlrpc_set_wait(set);
+       rc = ptlrpc_set_wait(env, set);
        ptlrpc_set_destroy(set);
 
        GOTO(out, rc);
index 6949283..810bf35 100644 (file)
@@ -2145,7 +2145,7 @@ static int lod_obd_set_info_async(const struct lu_env *env,
 
 
        if (no_set) {
-               rc2 = ptlrpc_set_wait(set);
+               rc2 = ptlrpc_set_wait(env, set);
                if (rc2 == 0 && rc == 0)
                        rc = rc2;
                ptlrpc_set_destroy(set);
index dc896ef..0b34a43 100644 (file)
@@ -5705,7 +5705,7 @@ static int lod_object_lock(const struct lu_env *env,
                                dlmflags |= LDLM_FL_COS_INCOMPAT;
 
                        LASSERT(ns != NULL);
-                       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+                       rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS,
                                                    policy, einfo->ei_mode,
                                                    &dlmflags, blocking,
                                                    completion, NULL,
index 9bdeab9..71b2829 100644 (file)
@@ -945,7 +945,7 @@ static int lov_statfs(const struct lu_env *env, struct obd_export *exp,
                        GOTO(out_set, rc);
        }
 
-       rc = ptlrpc_set_wait(rqset);
+       rc = ptlrpc_set_wait(env, rqset);
 
 out_set:
        if (rc < 0)
@@ -1253,7 +1253,7 @@ static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
 
        lov_tgts_putref(obddev);
        if (no_set) {
-               err = ptlrpc_set_wait(set);
+               err = ptlrpc_set_wait(env, set);
                if (rc == 0)
                        rc = err;
                ptlrpc_set_destroy(set);
index 080dc19..ace6f1f 100644 (file)
@@ -2887,7 +2887,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
        struct ldlm_res_id *res_id = &info->mti_res_id;
-       __u64 dlmflags = 0;
+       __u64 dlmflags = 0, *cookie = NULL;
        int rc;
        ENTRY;
 
@@ -2919,10 +2919,12 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
                }
        }
 
-
        fid_build_reg_res_name(mdt_object_fid(o), res_id);
        dlmflags |= LDLM_FL_ATOMIC_CB;
 
+       if (info->mti_exp)
+               cookie = &info->mti_exp->exp_handle.h_cookie;
+
        /*
         * Take PDO lock on whole directory and build correct @res_id for lock
         * on part of directory.
@@ -2943,10 +2945,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
                        /* at least one of them should be set */
                        LASSERT(policy->l_inodebits.bits |
                                policy->l_inodebits.try_bits);
-                       rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
-                                         policy, res_id, dlmflags,
-                                         info->mti_exp == NULL ? NULL :
-                                         &info->mti_exp->exp_handle.h_cookie);
+                       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
+                                         lh->mlh_pdo_mode, policy, res_id,
+                                         dlmflags, cookie);
                        if (unlikely(rc != 0))
                                GOTO(out_unlock, rc);
                 }
@@ -2966,10 +2967,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
          * going to be sent to client. If it is - mdt_intent_policy() path will
          * fix it up and turn FL_LOCAL flag off.
          */
-       rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                         res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
-                         info->mti_exp == NULL ? NULL :
-                         &info->mti_exp->exp_handle.h_cookie);
+       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+                         policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
+                         cookie);
 out_unlock:
        if (rc != 0)
                mdt_object_unlock(info, o, lh, 1);
@@ -4043,9 +4043,12 @@ static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req,
                                LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE));
 }
 
-static int mdt_intent_policy(struct ldlm_namespace *ns,
-                            struct ldlm_lock **lockp, void *req_cookie,
-                            enum ldlm_mode mode, __u64 flags, void *data)
+static int mdt_intent_policy(const struct lu_env *env,
+                            struct ldlm_namespace *ns,
+                            struct ldlm_lock **lockp,
+                            void *req_cookie,
+                            enum ldlm_mode mode,
+                            __u64 flags, void *data)
 {
        struct tgt_session_info *tsi;
        struct mdt_thread_info  *info;
@@ -4059,7 +4062,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
 
        LASSERT(req != NULL);
 
-       tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+       tsi = tgt_ses_info(env);
 
        info = tsi2mdt_info(tsi);
        LASSERT(info != NULL);
index f78a67e..c84aa6a 100644 (file)
@@ -406,7 +406,8 @@ struct mdt_thread_info {
         /*
          * Object attributes.
          */
-        struct md_attr             mti_attr;
+       struct md_attr             mti_attr;
+       struct md_attr             mti_attr2; /* mdt_lvb.c */
         /*
          * Body for "habeo corpus" operations.
          */
@@ -1137,7 +1138,8 @@ static int mdt_dom_glimpse_ast(struct ldlm_lock *lock, void *reqp)
 }
 
 /* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
-static inline int mdt_fid_lock(struct ldlm_namespace *ns,
+static inline int mdt_fid_lock(const struct lu_env *env,
+                              struct ldlm_namespace *ns,
                               struct lustre_handle *lh, enum ldlm_mode mode,
                               union ldlm_policy_data *policy,
                               const struct ldlm_res_id *res_id,
@@ -1149,7 +1151,7 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns,
        LASSERT(ns != NULL);
        LASSERT(lh != NULL);
 
-       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
+       rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy,
                                    mode, &flags, mdt_blocking_ast,
                                    ldlm_completion_ast,
                                    glimpse ? mdt_dom_glimpse_ast : NULL,
@@ -1196,8 +1198,9 @@ int mdt_lsom_update(struct mdt_thread_info *info, struct mdt_object *obj,
 /* mdt_lvb.c */
 extern struct ldlm_valblock_ops mdt_lvbo;
 int mdt_dom_lvb_is_valid(struct ldlm_resource *res);
-int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
-                       struct ptlrpc_request *req, bool increase_only);
+int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+                       struct ldlm_lock *lock, struct ptlrpc_request *req,
+                       bool increase_only);
 
 void mdt_enable_cos(struct mdt_device *dev, bool enable);
 int mdt_cos_is_enabled(struct mdt_device *);
index c5dc89f..2d6f802 100644 (file)
@@ -1026,7 +1026,7 @@ int mdt_dom_object_size(const struct lu_env *env, struct mdt_device *mdt,
 
        /* Update lvbo data if DoM lock returned or if LVB is not yet valid. */
        if (dom_lock || !mdt_dom_lvb_is_valid(res))
-               mdt_dom_lvbo_update(res, NULL, NULL, false);
+               mdt_dom_lvbo_update(env, res, NULL, NULL, false);
 
        mdt_lvb2body(res, mb);
        ldlm_resource_putref(res);
@@ -1124,7 +1124,7 @@ int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
 fill_mbo:
        /* LVB can be without valid data in case of DOM */
        if (!mdt_dom_lvb_is_valid(res))
-               mdt_dom_lvbo_update(res, lock, NULL, false);
+               mdt_dom_lvbo_update(mti->mti_env, res, lock, NULL, false);
        mdt_lvb2body(res, mbo);
        RETURN(rc);
 }
@@ -1225,10 +1225,10 @@ void mdt_dom_discard_data(struct mdt_thread_info *info,
 
        /* Tell the clients that the object is gone now and that they should
         * throw away any cached pages. */
-       rc = ldlm_cli_enqueue_local(mdt->mdt_namespace, res_id, LDLM_IBITS,
-                                   policy, LCK_PW, &flags, ldlm_blocking_ast,
-                                   ldlm_completion_ast, NULL, NULL, 0,
-                                   LVB_T_NONE, NULL, &dom_lh);
+       rc = ldlm_cli_enqueue_local(info->mti_env, mdt->mdt_namespace, res_id,
+                                   LDLM_IBITS, policy, LCK_PW, &flags,
+                                   ldlm_blocking_ast, ldlm_completion_ast,
+                                   NULL, NULL, 0, LVB_T_NONE, NULL, &dom_lh);
 
        /* We only care about the side-effects, just drop the lock. */
        if (rc == ELDLM_OK)
index 6859d42..9ee1989 100644 (file)
@@ -34,7 +34,7 @@
 #include "mdt_internal.h"
 
 /* Called with res->lr_lvb_sem held */
-static int mdt_lvbo_init(struct ldlm_resource *res)
+static int mdt_lvbo_init(const struct lu_env *env, struct ldlm_resource *res)
 {
        if (IS_LQUOTA_RES(res)) {
                struct mdt_device       *mdt;
@@ -96,7 +96,7 @@ int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
        if (!mdt_object_exists(mo) || mdt_object_remote(mo))
                RETURN(-ENOENT);
 
-       ma = &info->mti_attr;
+       ma = &info->mti_attr2;
        ma->ma_valid = 0;
        ma->ma_need = MA_INODE;
        rc = mo_attr_get(env, mdt_object_child(mo), ma);
@@ -140,15 +140,15 @@ int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
        RETURN(rc);
 }
 
-int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
-                       struct ptlrpc_request *req, bool increase_only)
+int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+                       struct ldlm_lock *lock, struct ptlrpc_request *req,
+                       bool increase_only)
 {
        struct obd_export *exp = lock ? lock->l_export : NULL;
        struct mdt_device *mdt;
        struct mdt_object *mo;
        struct mdt_thread_info *info;
        struct ost_lvb *lvb;
-       struct lu_env env;
        struct lu_fid *fid;
        int rc = 0;
 
@@ -173,18 +173,21 @@ int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
        if (mdt == NULL)
                RETURN(-ENOENT);
 
-       rc = lu_env_init(&env, LCT_MD_THREAD);
-       if (rc)
-               RETURN(rc);
-
-       info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
-       if (info == NULL)
-               GOTO(out_env, rc = -ENOMEM);
-
-       memset(info, 0, sizeof *info);
-       info->mti_env = &env;
-       info->mti_exp = req ? req->rq_export : NULL;
-       info->mti_mdt = mdt;
+       LASSERT(env);
+       info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+       if (!info) {
+               rc = lu_env_refill_by_tags((struct lu_env *)env,
+                                          LCT_MD_THREAD, 0);
+               if (rc)
+                       GOTO(out_env, rc);
+               info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+               if (!info)
+                       GOTO(out_env, rc = -ENOMEM);
+       }
+       if (!info->mti_exp)
+               info->mti_exp = req ? req->rq_export : NULL;
+       if (!info->mti_mdt)
+               info->mti_mdt = mdt;
 
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &res->lr_name);
@@ -238,19 +241,19 @@ int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
 
 disk_update:
        /* Update the LVB from the disk inode */
-       mo = mdt_object_find(&env, mdt, fid);
+       mo = mdt_object_find(env, mdt, fid);
        if (IS_ERR(mo))
                GOTO(out_env, rc = PTR_ERR(mo));
 
-       rc = mdt_dom_disk_lvbo_update(&env, mo, res, !!increase_only);
-       mdt_object_put(&env, mo);
+       rc = mdt_dom_disk_lvbo_update(env, mo, res, !!increase_only);
+       mdt_object_put(env, mo);
 out_env:
-       lu_env_fini(&env);
        return rc;
 }
 
-static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
-                          struct ptlrpc_request *req, int increase_only)
+static int mdt_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+                          struct ldlm_lock *lock, struct ptlrpc_request *req,
+                          int increase_only)
 {
        ENTRY;
 
@@ -272,7 +275,8 @@ static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
         * by MDT for DOM objects only.
         */
        if (lock == NULL || ldlm_has_dom(lock))
-               return mdt_dom_lvbo_update(res, lock, req, !!increase_only);
+               return mdt_dom_lvbo_update(env, res, lock, req,
+                                          !!increase_only);
        return 0;
 }
 
@@ -302,9 +306,9 @@ static int mdt_lvbo_size(struct ldlm_lock *lock)
        return 0;
 }
 
-static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
+static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
+                        void *lvb, int lvblen)
 {
-       struct lu_env env;
        struct mdt_thread_info *info;
        struct mdt_device *mdt;
        struct lu_fid *fid;
@@ -324,6 +328,23 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
                RETURN(rc);
        }
 
+       info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+       if (!info) {
+               rc = lu_env_refill_by_tags((struct lu_env *)env,
+                                          LCT_MD_THREAD, 0);
+               if (rc)
+                       GOTO(out, rc);
+               info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+               if (!info)
+                       GOTO(out, rc = -ENOMEM);
+       }
+       if (!info->mti_env)
+               info->mti_env = env;
+       if (!info->mti_exp)
+               info->mti_exp = lock->l_export;
+       if (!info->mti_mdt)
+               info->mti_mdt = mdt;
+
        /* LVB for DoM lock is needed only for glimpse,
         * don't fill DoM data if there is layout lock */
        if (ldlm_has_dom(lock)) {
@@ -331,7 +352,8 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
                int lvb_len = sizeof(struct ost_lvb);
 
                if (!mdt_dom_lvb_is_valid(res))
-                       mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0);
+                       mdt_dom_lvbo_update(env, lock->l_resource,
+                                           lock, NULL, 0);
 
                if (lvb_len > lvblen)
                        lvb_len = lvblen;
@@ -340,47 +362,30 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
                memcpy(lvb, res->lr_lvb_data, lvb_len);
                unlock_res(res);
 
-               RETURN(lvb_len);
+               GOTO(out, rc = lvb_len);
        }
 
        /* Only fill layout if layout lock is granted */
        if (!ldlm_has_layout(lock) || lock->l_granted_mode != lock->l_req_mode)
-               RETURN(0);
-
-       /* XXX create an env to talk to mdt stack. We should get this env from
-        * ptlrpc_thread->t_env. */
-       rc = lu_env_init(&env, LCT_MD_THREAD);
-       /* Likely ENOMEM */
-       if (rc)
-               RETURN(rc);
-
-       info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
-       /* Likely ENOMEM */
-       if (info == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       memset(info, 0, sizeof *info);
-       info->mti_env = &env;
-       info->mti_exp = lock->l_export;
-       info->mti_mdt = mdt;
+               GOTO(out, rc = 0);
 
        /* XXX get fid by resource id. why don't include fid in ldlm_resource */
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &lock->l_resource->lr_name);
 
-       obj = mdt_object_find(&env, info->mti_mdt, fid);
+       obj = mdt_object_find(env, info->mti_mdt, fid);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
        if (!mdt_object_exists(obj) || mdt_object_remote(obj))
-               GOTO(out, rc = -ENOENT);
+               GOTO(out_put, rc = -ENOENT);
 
        child = mdt_object_child(obj);
 
        /* get the length of lsm */
-       rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
+       rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
        if (rc < 0)
-               GOTO(out, rc);
+               GOTO(out_put, rc);
        if (rc > 0) {
                struct lu_buf *lmm = NULL;
                if (lvblen < rc) {
@@ -400,20 +405,20 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
                                     "%d (max_mdsize %d): rc = %d\n",
                                     mdt_obd_name(mdt), lvblen, rc,
                                     info->mti_mdt->mdt_max_mdsize, -ERANGE);
-                       GOTO(out, rc = -ERANGE);
+                       GOTO(out_put, rc = -ERANGE);
                }
                lmm = &info->mti_buf;
                lmm->lb_buf = lvb;
                lmm->lb_len = rc;
-               rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV);
+               rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV);
                if (rc < 0)
-                       GOTO(out, rc);
+                       GOTO(out_put, rc);
        }
 
-out:
+out_put:
        if (obj != NULL && !IS_ERR(obj))
-               mdt_object_put(&env, obj);
-       lu_env_fini(&env);
+               mdt_object_put(env, obj);
+out:
        RETURN(rc < 0 ? 0 : rc);
 }
 
index 895fb90..e76c5c3 100644 (file)
@@ -1243,8 +1243,9 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
         * going to be sent to client. If it is - mdt_intent_policy() path will
         * fix it up and turn FL_LOCAL flag off.
         */
-       rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                         res, dlmflags, &info->mti_exp->exp_handle.h_cookie);
+       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+                         policy, res, dlmflags,
+                         &info->mti_exp->exp_handle.h_cookie);
        return rc;
 }
 
@@ -1284,12 +1285,13 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                memset(policy, 0, sizeof *policy);
                policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
                flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
-               rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
-                                          LCK_EX, &flags, ldlm_blocking_ast,
-                                          ldlm_completion_ast, NULL, NULL, 0,
-                                          LVB_T_NONE,
-                                          &info->mti_exp->exp_handle.h_cookie,
-                                          lh);
+               rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
+                                           LDLM_IBITS, policy, LCK_EX, &flags,
+                                           ldlm_blocking_ast,
+                                           ldlm_completion_ast, NULL, NULL, 0,
+                                           LVB_T_NONE,
+                                           &info->mti_exp->exp_handle.h_cookie,
+                                           lh);
                RETURN(rc);
        }
        RETURN(rc);
@@ -1355,8 +1357,9 @@ static int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
                fid_build_reg_res_name(mdt_object_fid(obj), res);
                memset(policy, 0, sizeof(*policy));
                policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
-               rc = mdt_fid_lock(info->mti_mdt->mdt_namespace, &lh->mlh_reg_lh,
-                                 LCK_EX, policy, res, dlmflags, NULL);
+               rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
+                                 &lh->mlh_reg_lh, LCK_EX, policy, res,
+                                 dlmflags, NULL);
        }
 
        if (rc != ELDLM_OK)
index 21587d2..af08318 100644 (file)
@@ -270,7 +270,7 @@ void mgs_revoke_lock(struct mgs_device *mgs, struct fs_db *fsdb, int type)
 
        if (!rc) {
                LASSERT(cp != NULL);
-               rc = ldlm_cli_enqueue_local(mgs->mgs_obd->obd_namespace,
+               rc = ldlm_cli_enqueue_local(NULL, mgs->mgs_obd->obd_namespace,
                                            &res_id, LDLM_PLAIN, NULL, LCK_EX,
                                            &flags, ldlm_blocking_ast, cp,
                                            NULL, fsdb, 0, LVB_T_NONE, NULL,
index 4509fe7..bdbbe76 100644 (file)
@@ -958,7 +958,8 @@ int ofd_fiemap_get(const struct lu_env *env, struct ofd_device *ofd,
 }
 
 
-static int ofd_lock_unlock_region(struct ldlm_namespace *ns,
+static int ofd_lock_unlock_region(const struct lu_env *env,
+                                 struct ldlm_namespace *ns,
                                  struct ldlm_res_id *res_id,
                                  unsigned long long begin,
                                  unsigned long long end)
@@ -969,7 +970,7 @@ static int ofd_lock_unlock_region(struct ldlm_namespace *ns,
 
        LASSERT(begin <= end);
 
-       rc = tgt_extent_lock(ns, res_id, begin, end, &lh, LCK_PR, &flags);
+       rc = tgt_extent_lock(env, ns, res_id, begin, end, &lh, LCK_PR, &flags);
        if (rc != 0)
                return rc;
 
@@ -997,7 +998,8 @@ static int ofd_lock_unlock_region(struct ldlm_namespace *ns,
  * \retval             0 if successful
  * \retval             negative value on error
  */
-static int lock_zero_regions(struct ldlm_namespace *ns,
+static int lock_zero_regions(const struct lu_env *env,
+                            struct ldlm_namespace *ns,
                             struct ldlm_res_id *res_id,
                             struct fiemap *fiemap)
 {
@@ -1013,7 +1015,7 @@ static int lock_zero_regions(struct ldlm_namespace *ns,
                if (fiemap_start[i].fe_logical > begin) {
                        CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
                               begin, fiemap_start[i].fe_logical);
-                       rc = ofd_lock_unlock_region(ns, res_id, begin,
+                       rc = ofd_lock_unlock_region(env, ns, res_id, begin,
                                                    fiemap_start[i].fe_logical);
                        if (rc)
                                RETURN(rc);
@@ -1025,7 +1027,7 @@ static int lock_zero_regions(struct ldlm_namespace *ns,
        if (begin < (fiemap->fm_start + fiemap->fm_length)) {
                CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
                       begin, fiemap->fm_start + fiemap->fm_length);
-               rc = ofd_lock_unlock_region(ns, res_id, begin,
+               rc = ofd_lock_unlock_region(env, ns, res_id, begin,
                                fiemap->fm_start + fiemap->fm_length);
        }
 
@@ -1126,7 +1128,7 @@ static int ofd_get_info_hdl(struct tgt_session_info *tsi)
                if (fm_key->lfik_oa.o_valid & OBD_MD_FLFLAGS &&
                    fm_key->lfik_oa.o_flags & OBD_FL_SRVLOCK) {
                        ost_fid_build_resid(fid, &fti->fti_resid);
-                       rc = lock_zero_regions(ofd->ofd_namespace,
+                       rc = lock_zero_regions(tsi->tsi_env, ofd->ofd_namespace,
                                               &fti->fti_resid, fiemap);
                        if (rc == 0)
                                rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid,
@@ -1218,7 +1220,8 @@ static int ofd_getattr_hdl(struct tgt_session_info *tsi)
                if (unlikely(tsi->tsi_ost_body->oa.o_flags & OBD_FL_FLUSH))
                        lock_mode = LCK_PW;
 
-               rc = tgt_extent_lock(tsi->tsi_tgt->lut_obd->obd_namespace,
+               rc = tgt_extent_lock(tsi->tsi_env,
+                                    tsi->tsi_tgt->lut_obd->obd_namespace,
                                     &tsi->tsi_resid, 0, OBD_OBJECT_EOF, &lh,
                                     lock_mode, &flags);
                if (rc != 0)
@@ -1350,7 +1353,7 @@ out:
                res = ldlm_resource_get(ofd->ofd_namespace, NULL,
                                        &tsi->tsi_resid, LDLM_EXTENT, 0);
                if (!IS_ERR(res)) {
-                       ldlm_res_lvbo_update(res, NULL, 0);
+                       ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0);
                        ldlm_resource_putref(res);
                }
        }
@@ -1975,8 +1978,8 @@ static int ofd_punch_hdl(struct tgt_session_info *tsi)
                  oa->o_flags & OBD_FL_SRVLOCK;
 
        if (srvlock) {
-               rc = tgt_extent_lock(ns, &tsi->tsi_resid, start, end, &lh,
-                                    LCK_PW, &flags);
+               rc = tgt_extent_lock(tsi->tsi_env, ns, &tsi->tsi_resid, start,
+                                    end, &lh, LCK_PW, &flags);
                if (rc != 0)
                        RETURN(rc);
        }
@@ -2019,7 +2022,7 @@ out:
                if (!IS_ERR(res)) {
                        struct ost_lvb *res_lvb;
 
-                       ldlm_res_lvbo_update(res, NULL, 0);
+                       ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0);
                        res_lvb = res->lr_lvb_data;
                        repbody->oa.o_valid |= OBD_MD_FLBLOCKS;
                        repbody->oa.o_blocks = res_lvb->lvb_blocks;
@@ -2186,7 +2189,7 @@ static int ofd_ladvise_hdl(struct tgt_session_info *tsi)
 
                        ioo.ioo_oid = body->oa.o_oi;
                        ioo.ioo_bufcnt = 1;
-                       rc = tgt_extent_lock(exp->exp_obd->obd_namespace,
+                       rc = tgt_extent_lock(env, exp->exp_obd->obd_namespace,
                                             &tsi->tsi_resid, start, end - 1,
                                             &lockh, LCK_PR, &flags);
                        if (rc != 0)
index c7fc515..8e3fe93 100644 (file)
@@ -212,9 +212,9 @@ out:
  * \retval             ELDLM_LOCK_ABORTED in other cases except error
  * \retval             negative errno on error
  */
-int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
-                     void *req_cookie, enum ldlm_mode mode, __u64 flags,
-                     void *data)
+int ofd_intent_policy(const struct lu_env *env, struct ldlm_namespace *ns,
+                     struct ldlm_lock **lockp, void *req_cookie,
+                     enum ldlm_mode mode, __u64 flags, void *data)
 {
        struct ptlrpc_request *req = req_cookie;
        struct ldlm_lock *lock = *lockp;
index df5b714..a76cb69 100644 (file)
@@ -426,9 +426,9 @@ extern struct ldlm_valblock_ops ofd_lvbo;
 /* ofd_dlm.c */
 extern struct kmem_cache *ldlm_glimpse_work_kmem;
 
-int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
-                     void *req_cookie, enum ldlm_mode mode, __u64 flags,
-                     void *data);
+int ofd_intent_policy(const struct lu_env *env, struct ldlm_namespace *ns,
+                     struct ldlm_lock **lockp, void *req_cookie,
+                     enum ldlm_mode mode, __u64 flags, void *data);
 
 static inline struct ofd_thread_info *ofd_info(const struct lu_env *env)
 {
index 8e038ce..f61d24a 100644 (file)
@@ -1360,7 +1360,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                        rs = ldlm_resource_get(ns, NULL, &info->fti_resid,
                                               LDLM_EXTENT, 0);
                        if (!IS_ERR(rs)) {
-                               ldlm_res_lvbo_update(rs, NULL, 1);
+                               ldlm_res_lvbo_update(env, rs, NULL, 1);
                                ldlm_resource_putref(rs);
                        }
                }
index 81f1e67..aecd1b6 100644 (file)
@@ -82,17 +82,18 @@ static int ofd_lvbo_free(struct ldlm_resource *res)
  * \retval             0 on successful setup
  * \retval             negative value on error
  */
-static int ofd_lvbo_init(struct ldlm_resource *res)
+static int ofd_lvbo_init(const struct lu_env *env, struct ldlm_resource *res)
 {
        struct ost_lvb          *lvb;
        struct ofd_device       *ofd;
        struct ofd_object       *fo;
        struct ofd_thread_info  *info;
-       struct lu_env            env;
        int                      rc = 0;
 
        ENTRY;
 
+       LASSERT(env);
+       info = ofd_info(env);
        LASSERT(res);
        LASSERT(mutex_is_locked(&res->lr_lvb_mutex));
 
@@ -105,25 +106,20 @@ static int ofd_lvbo_init(struct ldlm_resource *res)
        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
                RETURN(-ENOMEM);
 
-       rc = lu_env_init(&env, LCT_DT_THREAD);
-       if (rc)
-               RETURN(rc);
-
        OBD_ALLOC_PTR(lvb);
        if (lvb == NULL)
-               GOTO(out_env, rc = -ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        res->lr_lvb_data = lvb;
        res->lr_lvb_len = sizeof(*lvb);
 
-       info = ofd_info_init(&env, NULL);
        ost_fid_from_resid(&info->fti_fid, &res->lr_name,
                           ofd->ofd_lut.lut_lsd.lsd_osd_index);
-       fo = ofd_object_find(&env, ofd, &info->fti_fid);
+       fo = ofd_object_find(env, ofd, &info->fti_fid);
        if (IS_ERR(fo))
                GOTO(out_lvb, rc = PTR_ERR(fo));
 
-       rc = ofd_attr_get(&env, fo, &info->fti_attr);
+       rc = ofd_attr_get(env, fo, &info->fti_attr);
        if (rc)
                GOTO(out_obj, rc);
 
@@ -138,14 +134,15 @@ static int ofd_lvbo_init(struct ldlm_resource *res)
               PFID(&info->fti_fid), lvb->lvb_size,
               lvb->lvb_mtime, lvb->lvb_blocks);
 
+       info->fti_attr.la_valid = 0;
+
        EXIT;
 out_obj:
-       ofd_object_put(&env, fo);
+       ofd_object_put(env, fo);
 out_lvb:
        if (rc != 0)
                OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
-out_env:
-       lu_env_fini(&env);
+out:
        /* Don't free lvb data on lookup error */
        return rc;
 }
@@ -179,35 +176,32 @@ out_env:
  * \retval             0 on successful setup
  * \retval             negative value on error
  */
-static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
-                          struct ptlrpc_request *req, int increase_only)
+static int ofd_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+                          struct ldlm_lock *lock, struct ptlrpc_request *req,
+                          int increase_only)
 {
+       struct ofd_thread_info  *info;
        struct ofd_device       *ofd;
        struct ofd_object       *fo;
-       struct ofd_thread_info  *info;
        struct ost_lvb          *lvb;
-       struct lu_env            env;
        int                      rc = 0;
 
        ENTRY;
 
+       LASSERT(env);
+       info = ofd_info(env);
        LASSERT(res != NULL);
 
        ofd = ldlm_res_to_ns(res)->ns_lvbp;
        LASSERT(ofd != NULL);
 
-       rc = lu_env_init(&env, LCT_DT_THREAD);
-       if (rc)
-               RETURN(rc);
-
-       info = ofd_info_init(&env, NULL);
        fid_extract_from_res_name(&info->fti_fid, &res->lr_name);
 
        lvb = res->lr_lvb_data;
        if (lvb == NULL) {
                CERROR("%s: no LVB data for "DFID"\n",
                       ofd_name(ofd), PFID(&info->fti_fid));
-               GOTO(out_env, rc = 0);
+               GOTO(out, rc = 0);
        }
 
        /* Update the LVB from the network message */
@@ -279,11 +273,11 @@ disk_update:
        /* Update the LVB from the disk inode */
        ost_fid_from_resid(&info->fti_fid, &res->lr_name,
                           ofd->ofd_lut.lut_lsd.lsd_osd_index);
-       fo = ofd_object_find(&env, ofd, &info->fti_fid);
+       fo = ofd_object_find(env, ofd, &info->fti_fid);
        if (IS_ERR(fo))
-               GOTO(out_env, rc = PTR_ERR(fo));
+               GOTO(out, rc = PTR_ERR(fo));
 
-       rc = ofd_attr_get(&env, fo, &info->fti_attr);
+       rc = ofd_attr_get(env, fo, &info->fti_attr);
        if (rc)
                GOTO(out_obj, rc);
 
@@ -322,9 +316,8 @@ disk_update:
        unlock_res(res);
 
 out_obj:
-       ofd_object_put(&env, fo);
-out_env:
-       lu_env_fini(&env);
+       ofd_object_put(env, fo);
+out:
        return rc;
 }
 
@@ -358,7 +351,8 @@ static int ofd_lvbo_size(struct ldlm_lock *lock)
  *
  * \retval             size of LVB data written into \a buf buffer
  */
-static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
+static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
+                        void *buf, int buflen)
 {
        struct ldlm_resource *res = lock->l_resource;
        int lvb_len;
index aa59c14..0d6e2f4 100644 (file)
@@ -952,7 +952,7 @@ out:
                 * to go... deadlock! */
                res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0);
                if (!IS_ERR(res)) {
-                       ldlm_res_lvbo_update(res, NULL, 0);
+                       ldlm_res_lvbo_update(env, res, NULL, 0);
                        ldlm_resource_putref(res);
                }
        }
@@ -994,7 +994,7 @@ int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
        /* Tell the clients that the object is gone now and that they should
         * throw away any cached pages. */
        ost_fid_build_resid(fid, &info->fti_resid);
-       rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid,
+       rc = ldlm_cli_enqueue_local(env, ofd->ofd_namespace, &info->fti_resid,
                                    LDLM_EXTENT, &policy, LCK_PW, &flags,
                                    ldlm_blocking_ast, ldlm_completion_ast,
                                    NULL, NULL, 0, LVB_T_NONE, NULL, &lockh);
index a167146..1ea3b9f 100644 (file)
@@ -2322,11 +2322,12 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
  * error or otherwise be interrupted).
  * Returns 0 on success or error code otherwise.
  */
-int ptlrpc_set_wait(struct ptlrpc_request_set *set)
+int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-       struct list_head            *tmp;
-        struct ptlrpc_request *req;
-        struct l_wait_info     lwi;
+       struct list_head *tmp;
+       struct ptlrpc_request *req;
+       struct l_wait_info lwi;
+       struct lu_env _env;
        time64_t timeout;
        int rc;
        ENTRY;
@@ -2344,6 +2345,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
        if (list_empty(&set->set_requests))
                 RETURN(0);
 
+       /* ideally we want env provide by the caller all the time,
+        * but at the moment that would mean a massive change in
+        * LDLM while benefits would be close to zero, so just
+        * initialize env here for those rare cases */
+       if (!env) {
+               /* XXX: skip on the client side? */
+               rc = lu_env_init(&_env, LCT_DT_THREAD);
+               if (rc)
+                       RETURN(rc);
+               env = &_env;
+       }
+
         do {
                 timeout = ptlrpc_set_next_timeout(set);
 
@@ -2371,7 +2384,8 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                         lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1),
                                           ptlrpc_expired_set, set);
 
-                rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
+               rc = l_wait_event(set->set_waitq,
+                                 ptlrpc_check_set(env, set), &lwi);
 
                 /* LU-769 - if we ignored the signal because it was already
                  * pending when we started, we need to handle it now or we risk
@@ -2422,6 +2436,9 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                         rc = req->rq_status;
         }
 
+       if (env && env == &_env)
+               lu_env_fini(&_env);
+
        RETURN(rc);
 }
 EXPORT_SYMBOL(ptlrpc_set_wait);
@@ -2921,13 +2938,13 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
        /* for distributed debugging */
        lustre_msg_set_status(req->rq_reqmsg, current_pid());
 
-        /* add a ref for the set (see comment in ptlrpc_set_add_req) */
-        ptlrpc_request_addref(req);
-        ptlrpc_set_add_req(set, req);
-        rc = ptlrpc_set_wait(set);
-        ptlrpc_set_destroy(set);
+       /* add a ref for the set (see comment in ptlrpc_set_add_req) */
+       ptlrpc_request_addref(req);
+       ptlrpc_set_add_req(set, req);
+       rc = ptlrpc_set_wait(NULL, set);
+       ptlrpc_set_destroy(set);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(ptlrpc_queue_wait);
 
index 1840e3a..b98d082 100644 (file)
@@ -503,11 +503,11 @@ static int ptlrpcd(void *arg)
                  */
         } while (exit < 2);
 
-        /*
-         * Wait for inflight requests to drain.
-         */
+       /*
+        * Wait for inflight requests to drain.
+        */
        if (!list_empty(&set->set_requests))
-                ptlrpc_set_wait(set);
+               ptlrpc_set_wait(&env, set);
        lu_context_fini(&env.le_ctx);
        lu_context_fini(env.le_ses);
 
index 4a8aa81..f29db18 100644 (file)
@@ -140,7 +140,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        /* on success, pack lvb in reply */
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
        lvb_len = ldlm_lvbo_size(*lockp);
-       lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+       lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, lvb_len);
        if (lvb_len < 0)
                GOTO(out, rc = lvb_len);
 
index ed8c030..fe7f26e 100644 (file)
@@ -1604,7 +1604,7 @@ int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
        LASSERT(ns != NULL);
        LASSERT(!lustre_handle_is_used(lh));
 
-       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy, mode,
+       rc = ldlm_cli_enqueue_local(NULL, ns, res_id, LDLM_IBITS, &policy, mode,
                                    flags, ldlm_blocking_ast,
                                    ldlm_completion_ast, ldlm_glimpse_ast,
                                    NULL, 0, LVB_T_NONE, NULL, lh);
@@ -1624,9 +1624,9 @@ EXPORT_SYMBOL(tgt_mdt_data_unlock);
  * Helper function for getting server side [start, start+count] DLM lock
  * if asked by client.
  */
-int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
-                   __u64 start, __u64 end, struct lustre_handle *lh,
-                   int mode, __u64 *flags)
+int tgt_extent_lock(const struct lu_env *env, struct ldlm_namespace *ns,
+                   struct ldlm_res_id *res_id, __u64 start, __u64 end,
+                   struct lustre_handle *lh, int mode, __u64 *flags)
 {
        union ldlm_policy_data policy;
        int rc;
@@ -1649,8 +1649,8 @@ int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
        else
                policy.l_extent.end = end | ~PAGE_MASK;
 
-       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_EXTENT, &policy, mode,
-                                   flags, ldlm_blocking_ast,
+       rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_EXTENT, &policy,
+                                   mode, flags, ldlm_blocking_ast,
                                    ldlm_completion_ast, ldlm_glimpse_ast,
                                    NULL, 0, LVB_T_NONE, NULL, lh);
        RETURN(rc == ELDLM_OK ? 0 : -EIO);
@@ -1664,9 +1664,10 @@ void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
 }
 EXPORT_SYMBOL(tgt_extent_unlock);
 
-static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id,
-                       struct obd_ioobj *obj, struct niobuf_remote *nb,
-                       struct lustre_handle *lh, enum ldlm_mode mode)
+static int tgt_brw_lock(const struct lu_env *env, struct obd_export *exp,
+                       struct ldlm_res_id *res_id, struct obd_ioobj *obj,
+                       struct niobuf_remote *nb, struct lustre_handle *lh,
+                       enum ldlm_mode mode)
 {
        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
        __u64                    flags = 0;
@@ -1693,7 +1694,7 @@ static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id,
        if (exp->exp_connect_data.ocd_connect_flags & OBD_CONNECT_IBITS)
                rc = tgt_mdt_data_lock(ns, res_id, lh, mode, &flags);
        else
-               rc = tgt_extent_lock(ns, res_id, nb[0].rnb_offset,
+               rc = tgt_extent_lock(env, ns, res_id, nb[0].rnb_offset,
                                     nb[nrbufs - 1].rnb_offset +
                                     nb[nrbufs - 1].rnb_len - 1,
                                     lh, mode, &flags);
@@ -2158,8 +2159,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
        local_nb = tbc->local;
 
-       rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
-                         LCK_PR);
+       rc = tgt_brw_lock(tsi->tsi_env, exp, &tsi->tsi_resid, ioo, remote_nb,
+                         &lockh, LCK_PR);
        if (rc != 0)
                RETURN(rc);
 
@@ -2496,8 +2497,8 @@ int tgt_brw_write(struct tgt_session_info *tsi)
 
        local_nb = tbc->local;
 
-       rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
-                         LCK_PW);
+       rc = tgt_brw_lock(tsi->tsi_env, exp, &tsi->tsi_resid, ioo, remote_nb,
+                         &lockh, LCK_PW);
        if (rc != 0)
                GOTO(out, rc);