to save on env allocation.
Benchmarks made by Shuichi Ihara demonstrated 13% improvement
for small I/Os: 564k vs 639k IOPS. the details can be found
LU-11164.
Change-Id: I797e3d7e19ef408993004a2b872842d655240525
Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/32832
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Patrick Farrell <paf@cray.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
struct lustre_handle *lh, int mode, __u64 *flags);
void tgt_mdt_data_unlock(struct lustre_handle *lh, enum ldlm_mode mode);
-int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
- __u64 start, __u64 end, struct lustre_handle *lh,
- int mode, __u64 *flags);
+int tgt_extent_lock(const struct lu_env *env, struct ldlm_namespace *ns,
+ struct ldlm_res_id *res_id, __u64 start, __u64 end,
+ struct lustre_handle *lh, int mode, __u64 *flags);
void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode);
int tgt_brw_read(struct tgt_session_info *tsi);
int tgt_brw_write(struct tgt_session_info *tsi);
struct completion pl_kobj_unregister;
};
-typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
- void *req_cookie, enum ldlm_mode mode,
- __u64 flags, void *data);
+typedef int (*ldlm_res_policy)(const struct lu_env *env,
+ struct ldlm_namespace *,
+ struct ldlm_lock **, void *req_cookie,
+ enum ldlm_mode mode, __u64 flags, void *data);
typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock);
* of ldlm_[res_]lvbo_[init,update,fill]() functions.
*/
struct ldlm_valblock_ops {
- int (*lvbo_init)(struct ldlm_resource *res);
- int (*lvbo_update)(struct ldlm_resource *res, struct ldlm_lock *lock,
- struct ptlrpc_request *r, int increase);
+ int (*lvbo_init)(const struct lu_env *env, struct ldlm_resource *res);
+ int (*lvbo_update)(const struct lu_env *env, struct ldlm_resource *res,
+ struct ldlm_lock *lock, struct ptlrpc_request *r,
+ int increase);
int (*lvbo_free)(struct ldlm_resource *res);
/* Return size of lvb data appropriate RPC size can be reserved */
int (*lvbo_size)(struct ldlm_lock *lock);
/* Called to fill in lvb data to RPC buffer @buf */
- int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int buflen);
+ int (*lvbo_fill)(const struct lu_env *env, struct ldlm_lock *lock,
+ void *buf, int buflen);
};
/**
return &lock->l_resource->lr_ns_bucket->nsb_at_estimate;
}
-static inline int ldlm_lvbo_init(struct ldlm_resource *res)
+static inline int ldlm_lvbo_init(const struct lu_env *env,
+ struct ldlm_resource *res)
{
struct ldlm_namespace *ns = ldlm_res_to_ns(res);
int rc = 0;
mutex_unlock(&res->lr_lvb_mutex);
return 0;
}
- rc = ns->ns_lvbo->lvbo_init(res);
+ rc = ns->ns_lvbo->lvbo_init(env, res);
if (rc < 0) {
CDEBUG(D_DLMTRACE, "lvbo_init failed for resource : rc = %d\n",
rc);
return 0;
}
-static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
+static inline int ldlm_lvbo_fill(const struct lu_env *env,
+ struct ldlm_lock *lock, void *buf, int len)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
int rc;
if (ns->ns_lvbo != NULL) {
LASSERT(ns->ns_lvbo->lvbo_fill != NULL);
/* init lvb now if not already */
- rc = ldlm_lvbo_init(lock->l_resource);
+ rc = ldlm_lvbo_init(env, lock->l_resource);
if (rc < 0) {
CERROR("lock %p: delayed lvb init failed (rc %d)",
lock, rc);
return rc;
}
- return ns->ns_lvbo->lvbo_fill(lock, buf, len);
+ return ns->ns_lvbo->lvbo_fill(env, lock, buf, len);
}
return 0;
}
* Update Lock Value Block Operations (LVBO) on a resource taking into account
* data from request \a r
*/
-static inline int ldlm_lvbo_update(struct ldlm_resource *res,
+static inline int ldlm_lvbo_update(const struct lu_env *env,
+ struct ldlm_resource *res,
struct ldlm_lock *lock,
struct ptlrpc_request *req, int increase)
{
int rc;
/* delayed lvb init may be required */
- rc = ldlm_lvbo_init(res);
+ rc = ldlm_lvbo_init(env, res);
if (rc < 0) {
CERROR("delayed lvb init failed (rc %d)\n", rc);
return rc;
}
if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
- return ns->ns_lvbo->lvbo_update(res, lock, req, increase);
+ return ns->ns_lvbo->lvbo_update(env, res, lock, req, increase);
return 0;
}
-static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
- struct ptlrpc_request *req, int increase)
+static inline int ldlm_res_lvbo_update(const struct lu_env *env,
+ struct ldlm_resource *res,
+ struct ptlrpc_request *req,
+ int increase)
{
- return ldlm_lvbo_update(res, NULL, req, increase);
+ return ldlm_lvbo_update(env, res, NULL, req, increase);
}
int ldlm_error2errno(enum ldlm_error error);
enum ldlm_mode mode, __u64 *flags, void *lvb,
__u32 lvb_len,
const struct lustre_handle *lockh, int rc);
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+int ldlm_cli_enqueue_local(const struct lu_env *env,
+ struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
enum ldlm_type type, union ldlm_policy_data *policy,
enum ldlm_mode mode, __u64 *flags,
struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
void *arg);
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
-int ptlrpc_set_wait(struct ptlrpc_request_set *);
+int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *);
void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
void ptlrpc_set_destroy(struct ptlrpc_request_set *);
void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
enum ldlm_type type, enum ldlm_mode mode,
const struct ldlm_callback_suite *cbs,
void *data, __u32 lvb_len, enum lvb_type lvb_type);
-enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
+enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
+ struct ldlm_namespace *,
+ struct ldlm_lock **,
void *cookie, __u64 *flags);
void ldlm_lock_addref_internal(struct ldlm_lock *, enum ldlm_mode mode);
void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, enum ldlm_mode mode);
void ldlm_discard_bl_list(struct list_head *bl_list);
#endif
int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
- ldlm_desc_ast_t ast_type);
+ ldlm_desc_ast_t ast_type);
int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq);
int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, ktime_t last_use);
#define ldlm_lock_remove_from_lru(lock) \
* set, skip all the enqueueing and delegate lock processing to intent policy
* function.
*/
-enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
+enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
+ struct ldlm_namespace *ns,
struct ldlm_lock **lockp,
void *cookie, __u64 *flags)
{
/* policies are not executed on the client or during replay */
if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
&& !local && ns->ns_policy) {
- rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
- NULL);
+ rc = ns->ns_policy(env, ns, lockp, cookie, lock->l_req_mode,
+ *flags, NULL);
if (rc == ELDLM_LOCK_REPLACED) {
/* The lock that was returned has already been granted,
* and placed into lockp. If it's not the same as the
* one.
*/
int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
- ldlm_desc_ast_t ast_type)
+ ldlm_desc_ast_t ast_type)
{
struct ldlm_cb_set_arg *arg;
set_producer_func work_ast_lock;
if (arg->set == NULL)
GOTO(out, rc = -ENOMEM);
- ptlrpc_set_wait(arg->set);
+ ptlrpc_set_wait(NULL, arg->set);
ptlrpc_set_destroy(arg->set);
rc = atomic_read(&arg->restart) ? -ERESTART : 0;
EXPORT_SYMBOL(ldlm_lock_set_data);
struct export_cl_data {
+ const struct lu_env *ecl_env;
struct obd_export *ecl_exp;
int ecl_loop;
};
res = ldlm_resource_getref(lock->l_resource);
- ldlm_lvbo_update(res, lock, NULL, 1);
+ ldlm_lvbo_update(ecl->ecl_env, res, lock, NULL, 1);
ldlm_lock_cancel(lock);
if (!exp->exp_obd->obd_stopping)
ldlm_reprocess_all(res);
*/
int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
{
+ struct lu_env env;
struct export_cl_data ecl = {
.ecl_exp = exp,
.ecl_loop = 0,
};
+ int rc;
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc)
+ RETURN(rc);
+ ecl.ecl_env = &env;
while (!list_empty(&exp->exp_bl_list)) {
struct ldlm_lock *lock;
LDLM_LOCK_RELEASE(lock);
}
+ lu_env_fini(&env);
+
CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
"left on hash table %d.\n", exp, ecl.ecl_loop,
atomic_read(&exp->exp_lock_hash->hs_count));
*/
int ldlm_export_cancel_locks(struct obd_export *exp)
{
- struct export_cl_data ecl = {
- .ecl_exp = exp,
- .ecl_loop = 0,
- };
+ struct export_cl_data ecl;
+ struct lu_env env;
+ int rc;
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc)
+ RETURN(rc);
+ ecl.ecl_env = &env;
+ ecl.ecl_exp = exp;
+ ecl.ecl_loop = 0;
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_cancel_locks_for_export_cb, &ecl);
exp->exp_obd->obd_stopping)
ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
+ lu_env_fini(&env);
+
return ecl.ecl_loop;
}
/**
* Perform lock cleanup if AST reply came with error.
*/
-static int ldlm_handle_ast_error(struct ldlm_lock *lock,
+static int ldlm_handle_ast_error(const struct lu_env *env,
+ struct ldlm_lock *lock,
struct ptlrpc_request *req, int rc,
const char *ast_type)
{
/* update lvbo to return proper attributes.
* see bug 23174 */
ldlm_resource_getref(res);
- ldlm_lvbo_update(res, lock, NULL, 1);
+ ldlm_lvbo_update(env, res, lock, NULL, 1);
ldlm_resource_putref(res);
}
ldlm_lock_cancel(lock);
} else if (rc == -ELDLM_NO_LOCK_DATA) {
LDLM_DEBUG(lock, "lost race - client has a lock but no "
"inode");
- ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
+ ldlm_lvbo_update(env, lock->l_resource, lock, NULL, 1);
} else if (rc != 0) {
- rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+ rc = ldlm_handle_ast_error(env, lock, req,
+ rc, "glimpse");
} else {
- rc = ldlm_lvbo_update(lock->l_resource, lock, req, 1);
+ rc = ldlm_lvbo_update(env, lock->l_resource,
+ lock, req, 1);
}
break;
case LDLM_BL_CALLBACK:
if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+ rc = ldlm_handle_ast_error(env, lock, req,
+ rc, "blocking");
break;
case LDLM_CP_CALLBACK:
if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+ rc = ldlm_handle_ast_error(env, lock, req,
+ rc, "completion");
break;
default:
LDLM_ERROR(lock, "invalid opcode for lock callback %d",
ldlm_lock2desc(lock, &body->lock_desc);
if (lvb_len > 0) {
void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+ const struct lu_env *env = NULL;
- lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
+ if (req->rq_svc_thread)
+ env = req->rq_svc_thread->t_env;
+
+ lvb_len = ldlm_lvbo_fill(env, lock, lvb, lvb_len);
if (lvb_len < 0) {
/* We still need to send the RPC to wake up the blocked
* enqueue thread on the client.
void *cookie = NULL;
int rc = 0;
struct ldlm_resource *res = NULL;
+ const struct lu_env *env = req->rq_svc_thread->t_env;
ENTRY;
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
res = lock->l_resource;
if (!(flags & LDLM_FL_REPLAY)) {
/* non-replayed lock, delayed lvb init may need to be done */
- rc = ldlm_lvbo_init(res);
+ rc = ldlm_lvbo_init(env, res);
if (rc < 0) {
LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc);
GOTO(out, rc);
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER, ldlm_lvbo_size(lock));
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
- GOTO(out, rc = -ENOMEM);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
+ GOTO(out, rc = -ENOMEM);
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- GOTO(out, rc);
- }
+ rc = req_capsule_server_pack(&req->rq_pill);
+ if (rc)
+ GOTO(out, rc);
+ }
- err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
+ err = ldlm_lock_enqueue(env, ns, &lock, cookie, &flags);
if (err) {
if ((int)err < 0)
rc = (int)err;
GOTO(out, err);
}
- dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- ldlm_lock2desc(lock, &dlm_rep->lock_desc);
- ldlm_lock2handle(lock, &dlm_rep->lock_handle);
+ ldlm_lock2desc(lock, &dlm_rep->lock_desc);
+ ldlm_lock2handle(lock, &dlm_rep->lock_handle);
if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
/* non-replayed lock, delayed lvb init may
* need to be occur now */
if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
- buflen = ldlm_lvbo_fill(lock, buf,
+ buflen = ldlm_lvbo_fill(env, lock, buf,
buflen);
if (buflen >= 0)
req_capsule_shrink(
const struct ldlm_request *dlm_req,
int first, enum lustre_at_flags flags)
{
+ const struct lu_env *env = req->rq_svc_thread->t_env;
struct ldlm_resource *res, *pres = NULL;
struct ldlm_lock *lock;
int i, count, done = 0;
LDLM_RESOURCE_DELREF(pres);
ldlm_resource_putref(pres);
}
- if (res != NULL) {
- ldlm_resource_getref(res);
- LDLM_RESOURCE_ADDREF(res);
+ if (res != NULL) {
+ ldlm_resource_getref(res);
+ LDLM_RESOURCE_ADDREF(res);
if (!ldlm_is_discard_data(lock))
- ldlm_lvbo_update(res, lock, NULL, 1);
- }
- pres = res;
- }
+ ldlm_lvbo_update(env, res, lock,
+ NULL, 1);
+ }
+ pres = res;
+ }
if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock) &&
lock->l_blast_sent != 0) {
/**
* Enqueue a local lock (typically on a server).
*/
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+int ldlm_cli_enqueue_local(const struct lu_env *env,
+ struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
enum ldlm_type type, union ldlm_policy_data *policy,
enum ldlm_mode mode, __u64 *flags,
if (IS_ERR(lock))
GOTO(out_nolock, err = PTR_ERR(lock));
- err = ldlm_lvbo_init(lock->l_resource);
+ err = ldlm_lvbo_init(env, lock->l_resource);
if (err < 0) {
LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
ldlm_lock_destroy_nolock(lock);
lock->l_req_extent = policy->l_extent;
}
- err = ldlm_lock_enqueue(ns, &lock, policy, flags);
- if (unlikely(err != ELDLM_OK))
- GOTO(out, err);
+ err = ldlm_lock_enqueue(env, ns, &lock, policy, flags);
+ if (unlikely(err != ELDLM_OK))
+ GOTO(out, err);
- if (policy != NULL)
- *policy = lock->l_policy_data;
+ if (policy != NULL)
+ *policy = lock->l_policy_data;
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, *flags, NULL);
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, *flags, NULL);
LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
EXIT;
__u32 lvb_len, const struct lustre_handle *lockh,
int rc)
{
- struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
- int is_replay = *flags & LDLM_FL_REPLAY;
- struct ldlm_lock *lock;
- struct ldlm_reply *reply;
- int cleanup_phase = 1;
- ENTRY;
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ const struct lu_env *env = NULL;
+ int is_replay = *flags & LDLM_FL_REPLAY;
+ struct ldlm_lock *lock;
+ struct ldlm_reply *reply;
+ int cleanup_phase = 1;
+ ENTRY;
+
+ if (req && req->rq_svc_thread)
+ env = req->rq_svc_thread->t_env;
lock = ldlm_handle2lock(lockh);
/* ldlm_cli_enqueue is holding a reference on this lock. */
}
}
- if (!is_replay) {
- rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
- if (lock->l_completion_ast != NULL) {
- int err = lock->l_completion_ast(lock, *flags, NULL);
- if (!rc)
- rc = err;
- if (rc)
- cleanup_phase = 1;
- }
- }
+ if (!is_replay) {
+ rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+ if (lock->l_completion_ast != NULL) {
+ int err = lock->l_completion_ast(lock, *flags, NULL);
+ if (!rc)
+ rc = err;
+ if (rc)
+ cleanup_phase = 1;
+ }
+ }
if (lvb_len > 0 && lvb != NULL) {
/* Copy the LVB here, and not earlier, because the completion
}
spin_unlock(<ds->ltd_lock);
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
if (rc < 0) {
ptlrpc_set_destroy(set);
RETURN(rc);
up_read(<ds->ltd_rw_sem);
/* Sync up */
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
if (rc < 0) {
ptlrpc_set_destroy(set);
RETURN(rc);
}
spin_unlock(<ds->ltd_lock);
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
if (rc < 0) {
ptlrpc_set_destroy(set);
RETURN(rc);
break;
}
- rc1 = ptlrpc_set_wait(set);
+ rc1 = ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
RETURN(rc != 0 ? rc : rc1);
up_read(<ds->ltd_rw_sem);
if (rc == 0 && atomic_read(&count) > 0)
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
memset(policy, 0, sizeof(*policy));
policy->l_extent.end = OBD_OBJECT_EOF;
ost_fid_build_resid(fid, resid);
- rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
- policy, LCK_EX, &flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL, 0,
- LVB_T_NONE, NULL, &lh);
+ rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
+ LDLM_EXTENT, policy, LCK_EX, &flags,
+ ldlm_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, 0, LVB_T_NONE, NULL, &lh);
if (rc != ELDLM_OK)
GOTO(put, rc = -EIO);
}
spin_unlock(&llsd->llsd_lock);
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
}
spin_unlock(&llsd->llsd_lock);
- ptlrpc_set_wait(set);
+ ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
RETURN_EXIT;
rc = dt_object_lock(env, obj, lh, einfo, policy);
} else {
- rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
+ rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
LDLM_IBITS, policy, mode,
&flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL, NULL,
ltd->ltd_index, lad->lad_name, rc);
lfsck_tgt_put(ltd);
} else {
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
}
ptlrpc_set_destroy(set);
goto again;
}
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
RETURN(rc);
}
up_read(<ds->ltd_rw_sem);
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
if (rc == 0)
RETURN(rc);
}
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
if (rc == 0)
}
up_read(<ds->ltd_rw_sem);
- rc = ptlrpc_set_wait(set);
+ rc = ptlrpc_set_wait(env, set);
ptlrpc_set_destroy(set);
GOTO(out, rc);
if (no_set) {
- rc2 = ptlrpc_set_wait(set);
+ rc2 = ptlrpc_set_wait(env, set);
if (rc2 == 0 && rc == 0)
rc = rc2;
ptlrpc_set_destroy(set);
dlmflags |= LDLM_FL_COS_INCOMPAT;
LASSERT(ns != NULL);
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+ rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS,
policy, einfo->ei_mode,
&dlmflags, blocking,
completion, NULL,
GOTO(out_set, rc);
}
- rc = ptlrpc_set_wait(rqset);
+ rc = ptlrpc_set_wait(env, rqset);
out_set:
if (rc < 0)
lov_tgts_putref(obddev);
if (no_set) {
- err = ptlrpc_set_wait(set);
+ err = ptlrpc_set_wait(env, set);
if (rc == 0)
rc = err;
ptlrpc_set_destroy(set);
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
union ldlm_policy_data *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
- __u64 dlmflags = 0;
+ __u64 dlmflags = 0, *cookie = NULL;
int rc;
ENTRY;
}
}
-
fid_build_reg_res_name(mdt_object_fid(o), res_id);
dlmflags |= LDLM_FL_ATOMIC_CB;
+ if (info->mti_exp)
+ cookie = &info->mti_exp->exp_handle.h_cookie;
+
/*
* Take PDO lock on whole directory and build correct @res_id for lock
* on part of directory.
/* at least one of them should be set */
LASSERT(policy->l_inodebits.bits |
policy->l_inodebits.try_bits);
- rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
- policy, res_id, dlmflags,
- info->mti_exp == NULL ? NULL :
- &info->mti_exp->exp_handle.h_cookie);
+ rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
+ lh->mlh_pdo_mode, policy, res_id,
+ dlmflags, cookie);
if (unlikely(rc != 0))
GOTO(out_unlock, rc);
}
* going to be sent to client. If it is - mdt_intent_policy() path will
* fix it up and turn FL_LOCAL flag off.
*/
- rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
- info->mti_exp == NULL ? NULL :
- &info->mti_exp->exp_handle.h_cookie);
+ rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+ policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
+ cookie);
out_unlock:
if (rc != 0)
mdt_object_unlock(info, o, lh, 1);
LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE));
}
-static int mdt_intent_policy(struct ldlm_namespace *ns,
- struct ldlm_lock **lockp, void *req_cookie,
- enum ldlm_mode mode, __u64 flags, void *data)
+static int mdt_intent_policy(const struct lu_env *env,
+ struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp,
+ void *req_cookie,
+ enum ldlm_mode mode,
+ __u64 flags, void *data)
{
struct tgt_session_info *tsi;
struct mdt_thread_info *info;
LASSERT(req != NULL);
- tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+ tsi = tgt_ses_info(env);
info = tsi2mdt_info(tsi);
LASSERT(info != NULL);
/*
* Object attributes.
*/
- struct md_attr mti_attr;
+ struct md_attr mti_attr;
+ struct md_attr mti_attr2; /* mdt_lvb.c */
/*
* Body for "habeo corpus" operations.
*/
}
/* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
-static inline int mdt_fid_lock(struct ldlm_namespace *ns,
+static inline int mdt_fid_lock(const struct lu_env *env,
+ struct ldlm_namespace *ns,
struct lustre_handle *lh, enum ldlm_mode mode,
union ldlm_policy_data *policy,
const struct ldlm_res_id *res_id,
LASSERT(ns != NULL);
LASSERT(lh != NULL);
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
+ rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy,
mode, &flags, mdt_blocking_ast,
ldlm_completion_ast,
glimpse ? mdt_dom_glimpse_ast : NULL,
/* mdt_lvb.c */
extern struct ldlm_valblock_ops mdt_lvbo;
int mdt_dom_lvb_is_valid(struct ldlm_resource *res);
-int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
- struct ptlrpc_request *req, bool increase_only);
+int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+ struct ldlm_lock *lock, struct ptlrpc_request *req,
+ bool increase_only);
void mdt_enable_cos(struct mdt_device *dev, bool enable);
int mdt_cos_is_enabled(struct mdt_device *);
/* Update lvbo data if DoM lock returned or if LVB is not yet valid. */
if (dom_lock || !mdt_dom_lvb_is_valid(res))
- mdt_dom_lvbo_update(res, NULL, NULL, false);
+ mdt_dom_lvbo_update(env, res, NULL, NULL, false);
mdt_lvb2body(res, mb);
ldlm_resource_putref(res);
fill_mbo:
/* LVB can be without valid data in case of DOM */
if (!mdt_dom_lvb_is_valid(res))
- mdt_dom_lvbo_update(res, lock, NULL, false);
+ mdt_dom_lvbo_update(mti->mti_env, res, lock, NULL, false);
mdt_lvb2body(res, mbo);
RETURN(rc);
}
/* Tell the clients that the object is gone now and that they should
* throw away any cached pages. */
- rc = ldlm_cli_enqueue_local(mdt->mdt_namespace, res_id, LDLM_IBITS,
- policy, LCK_PW, &flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL, 0,
- LVB_T_NONE, NULL, &dom_lh);
+ rc = ldlm_cli_enqueue_local(info->mti_env, mdt->mdt_namespace, res_id,
+ LDLM_IBITS, policy, LCK_PW, &flags,
+ ldlm_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, 0, LVB_T_NONE, NULL, &dom_lh);
/* We only care about the side-effects, just drop the lock. */
if (rc == ELDLM_OK)
#include "mdt_internal.h"
/* Called with res->lr_lvb_sem held */
-static int mdt_lvbo_init(struct ldlm_resource *res)
+static int mdt_lvbo_init(const struct lu_env *env, struct ldlm_resource *res)
{
if (IS_LQUOTA_RES(res)) {
struct mdt_device *mdt;
if (!mdt_object_exists(mo) || mdt_object_remote(mo))
RETURN(-ENOENT);
- ma = &info->mti_attr;
+ ma = &info->mti_attr2;
ma->ma_valid = 0;
ma->ma_need = MA_INODE;
rc = mo_attr_get(env, mdt_object_child(mo), ma);
RETURN(rc);
}
-int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
- struct ptlrpc_request *req, bool increase_only)
+int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+ struct ldlm_lock *lock, struct ptlrpc_request *req,
+ bool increase_only)
{
struct obd_export *exp = lock ? lock->l_export : NULL;
struct mdt_device *mdt;
struct mdt_object *mo;
struct mdt_thread_info *info;
struct ost_lvb *lvb;
- struct lu_env env;
struct lu_fid *fid;
int rc = 0;
if (mdt == NULL)
RETURN(-ENOENT);
- rc = lu_env_init(&env, LCT_MD_THREAD);
- if (rc)
- RETURN(rc);
-
- info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
- if (info == NULL)
- GOTO(out_env, rc = -ENOMEM);
-
- memset(info, 0, sizeof *info);
- info->mti_env = &env;
- info->mti_exp = req ? req->rq_export : NULL;
- info->mti_mdt = mdt;
+ LASSERT(env);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info) {
+ rc = lu_env_refill_by_tags((struct lu_env *)env,
+ LCT_MD_THREAD, 0);
+ if (rc)
+ GOTO(out_env, rc);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info)
+ GOTO(out_env, rc = -ENOMEM);
+ }
+ if (!info->mti_exp)
+ info->mti_exp = req ? req->rq_export : NULL;
+ if (!info->mti_mdt)
+ info->mti_mdt = mdt;
fid = &info->mti_tmp_fid2;
fid_extract_from_res_name(fid, &res->lr_name);
disk_update:
/* Update the LVB from the disk inode */
- mo = mdt_object_find(&env, mdt, fid);
+ mo = mdt_object_find(env, mdt, fid);
if (IS_ERR(mo))
GOTO(out_env, rc = PTR_ERR(mo));
- rc = mdt_dom_disk_lvbo_update(&env, mo, res, !!increase_only);
- mdt_object_put(&env, mo);
+ rc = mdt_dom_disk_lvbo_update(env, mo, res, !!increase_only);
+ mdt_object_put(env, mo);
out_env:
- lu_env_fini(&env);
return rc;
}
-static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
- struct ptlrpc_request *req, int increase_only)
+static int mdt_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+ struct ldlm_lock *lock, struct ptlrpc_request *req,
+ int increase_only)
{
ENTRY;
* by MDT for DOM objects only.
*/
if (lock == NULL || ldlm_has_dom(lock))
- return mdt_dom_lvbo_update(res, lock, req, !!increase_only);
+ return mdt_dom_lvbo_update(env, res, lock, req,
+ !!increase_only);
return 0;
}
return 0;
}
-static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
+static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
+ void *lvb, int lvblen)
{
- struct lu_env env;
struct mdt_thread_info *info;
struct mdt_device *mdt;
struct lu_fid *fid;
RETURN(rc);
}
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info) {
+ rc = lu_env_refill_by_tags((struct lu_env *)env,
+ LCT_MD_THREAD, 0);
+ if (rc)
+ GOTO(out, rc);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ if (!info)
+ GOTO(out, rc = -ENOMEM);
+ }
+ if (!info->mti_env)
+ info->mti_env = env;
+ if (!info->mti_exp)
+ info->mti_exp = lock->l_export;
+ if (!info->mti_mdt)
+ info->mti_mdt = mdt;
+
/* LVB for DoM lock is needed only for glimpse,
* don't fill DoM data if there is layout lock */
if (ldlm_has_dom(lock)) {
int lvb_len = sizeof(struct ost_lvb);
if (!mdt_dom_lvb_is_valid(res))
- mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0);
+ mdt_dom_lvbo_update(env, lock->l_resource,
+ lock, NULL, 0);
if (lvb_len > lvblen)
lvb_len = lvblen;
memcpy(lvb, res->lr_lvb_data, lvb_len);
unlock_res(res);
- RETURN(lvb_len);
+ GOTO(out, rc = lvb_len);
}
/* Only fill layout if layout lock is granted */
if (!ldlm_has_layout(lock) || lock->l_granted_mode != lock->l_req_mode)
- RETURN(0);
-
- /* XXX create an env to talk to mdt stack. We should get this env from
- * ptlrpc_thread->t_env. */
- rc = lu_env_init(&env, LCT_MD_THREAD);
- /* Likely ENOMEM */
- if (rc)
- RETURN(rc);
-
- info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
- /* Likely ENOMEM */
- if (info == NULL)
- GOTO(out, rc = -ENOMEM);
-
- memset(info, 0, sizeof *info);
- info->mti_env = &env;
- info->mti_exp = lock->l_export;
- info->mti_mdt = mdt;
+ GOTO(out, rc = 0);
/* XXX get fid by resource id. why don't include fid in ldlm_resource */
fid = &info->mti_tmp_fid2;
fid_extract_from_res_name(fid, &lock->l_resource->lr_name);
- obj = mdt_object_find(&env, info->mti_mdt, fid);
+ obj = mdt_object_find(env, info->mti_mdt, fid);
if (IS_ERR(obj))
GOTO(out, rc = PTR_ERR(obj));
if (!mdt_object_exists(obj) || mdt_object_remote(obj))
- GOTO(out, rc = -ENOENT);
+ GOTO(out_put, rc = -ENOENT);
child = mdt_object_child(obj);
/* get the length of lsm */
- rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
+ rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
if (rc < 0)
- GOTO(out, rc);
+ GOTO(out_put, rc);
if (rc > 0) {
struct lu_buf *lmm = NULL;
if (lvblen < rc) {
"%d (max_mdsize %d): rc = %d\n",
mdt_obd_name(mdt), lvblen, rc,
info->mti_mdt->mdt_max_mdsize, -ERANGE);
- GOTO(out, rc = -ERANGE);
+ GOTO(out_put, rc = -ERANGE);
}
lmm = &info->mti_buf;
lmm->lb_buf = lvb;
lmm->lb_len = rc;
- rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV);
+ rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV);
if (rc < 0)
- GOTO(out, rc);
+ GOTO(out_put, rc);
}
-out:
+out_put:
if (obj != NULL && !IS_ERR(obj))
- mdt_object_put(&env, obj);
- lu_env_fini(&env);
+ mdt_object_put(env, obj);
+out:
RETURN(rc < 0 ? 0 : rc);
}
* going to be sent to client. If it is - mdt_intent_policy() path will
* fix it up and turn FL_LOCAL flag off.
*/
- rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res, dlmflags, &info->mti_exp->exp_handle.h_cookie);
+ rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+ policy, res, dlmflags,
+ &info->mti_exp->exp_handle.h_cookie);
return rc;
}
memset(policy, 0, sizeof *policy);
policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
- LCK_EX, &flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL, 0,
- LVB_T_NONE,
- &info->mti_exp->exp_handle.h_cookie,
- lh);
+ rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
+ LDLM_IBITS, policy, LCK_EX, &flags,
+ ldlm_blocking_ast,
+ ldlm_completion_ast, NULL, NULL, 0,
+ LVB_T_NONE,
+ &info->mti_exp->exp_handle.h_cookie,
+ lh);
RETURN(rc);
}
RETURN(rc);
fid_build_reg_res_name(mdt_object_fid(obj), res);
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
- rc = mdt_fid_lock(info->mti_mdt->mdt_namespace, &lh->mlh_reg_lh,
- LCK_EX, policy, res, dlmflags, NULL);
+ rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
+ &lh->mlh_reg_lh, LCK_EX, policy, res,
+ dlmflags, NULL);
}
if (rc != ELDLM_OK)
if (!rc) {
LASSERT(cp != NULL);
- rc = ldlm_cli_enqueue_local(mgs->mgs_obd->obd_namespace,
+ rc = ldlm_cli_enqueue_local(NULL, mgs->mgs_obd->obd_namespace,
&res_id, LDLM_PLAIN, NULL, LCK_EX,
&flags, ldlm_blocking_ast, cp,
NULL, fsdb, 0, LVB_T_NONE, NULL,
}
-static int ofd_lock_unlock_region(struct ldlm_namespace *ns,
+static int ofd_lock_unlock_region(const struct lu_env *env,
+ struct ldlm_namespace *ns,
struct ldlm_res_id *res_id,
unsigned long long begin,
unsigned long long end)
LASSERT(begin <= end);
- rc = tgt_extent_lock(ns, res_id, begin, end, &lh, LCK_PR, &flags);
+ rc = tgt_extent_lock(env, ns, res_id, begin, end, &lh, LCK_PR, &flags);
if (rc != 0)
return rc;
* \retval 0 if successful
* \retval negative value on error
*/
-static int lock_zero_regions(struct ldlm_namespace *ns,
+static int lock_zero_regions(const struct lu_env *env,
+ struct ldlm_namespace *ns,
struct ldlm_res_id *res_id,
struct fiemap *fiemap)
{
if (fiemap_start[i].fe_logical > begin) {
CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
begin, fiemap_start[i].fe_logical);
- rc = ofd_lock_unlock_region(ns, res_id, begin,
+ rc = ofd_lock_unlock_region(env, ns, res_id, begin,
fiemap_start[i].fe_logical);
if (rc)
RETURN(rc);
if (begin < (fiemap->fm_start + fiemap->fm_length)) {
CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
begin, fiemap->fm_start + fiemap->fm_length);
- rc = ofd_lock_unlock_region(ns, res_id, begin,
+ rc = ofd_lock_unlock_region(env, ns, res_id, begin,
fiemap->fm_start + fiemap->fm_length);
}
if (fm_key->lfik_oa.o_valid & OBD_MD_FLFLAGS &&
fm_key->lfik_oa.o_flags & OBD_FL_SRVLOCK) {
ost_fid_build_resid(fid, &fti->fti_resid);
- rc = lock_zero_regions(ofd->ofd_namespace,
+ rc = lock_zero_regions(tsi->tsi_env, ofd->ofd_namespace,
&fti->fti_resid, fiemap);
if (rc == 0)
rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid,
if (unlikely(tsi->tsi_ost_body->oa.o_flags & OBD_FL_FLUSH))
lock_mode = LCK_PW;
- rc = tgt_extent_lock(tsi->tsi_tgt->lut_obd->obd_namespace,
+ rc = tgt_extent_lock(tsi->tsi_env,
+ tsi->tsi_tgt->lut_obd->obd_namespace,
&tsi->tsi_resid, 0, OBD_OBJECT_EOF, &lh,
lock_mode, &flags);
if (rc != 0)
res = ldlm_resource_get(ofd->ofd_namespace, NULL,
&tsi->tsi_resid, LDLM_EXTENT, 0);
if (!IS_ERR(res)) {
- ldlm_res_lvbo_update(res, NULL, 0);
+ ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0);
ldlm_resource_putref(res);
}
}
oa->o_flags & OBD_FL_SRVLOCK;
if (srvlock) {
- rc = tgt_extent_lock(ns, &tsi->tsi_resid, start, end, &lh,
- LCK_PW, &flags);
+ rc = tgt_extent_lock(tsi->tsi_env, ns, &tsi->tsi_resid, start,
+ end, &lh, LCK_PW, &flags);
if (rc != 0)
RETURN(rc);
}
if (!IS_ERR(res)) {
struct ost_lvb *res_lvb;
- ldlm_res_lvbo_update(res, NULL, 0);
+ ldlm_res_lvbo_update(tsi->tsi_env, res, NULL, 0);
res_lvb = res->lr_lvb_data;
repbody->oa.o_valid |= OBD_MD_FLBLOCKS;
repbody->oa.o_blocks = res_lvb->lvb_blocks;
ioo.ioo_oid = body->oa.o_oi;
ioo.ioo_bufcnt = 1;
- rc = tgt_extent_lock(exp->exp_obd->obd_namespace,
+ rc = tgt_extent_lock(env, exp->exp_obd->obd_namespace,
&tsi->tsi_resid, start, end - 1,
&lockh, LCK_PR, &flags);
if (rc != 0)
* \retval ELDLM_LOCK_ABORTED in other cases except error
* \retval negative errno on error
*/
-int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
- void *req_cookie, enum ldlm_mode mode, __u64 flags,
- void *data)
+int ofd_intent_policy(const struct lu_env *env, struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp, void *req_cookie,
+ enum ldlm_mode mode, __u64 flags, void *data)
{
struct ptlrpc_request *req = req_cookie;
struct ldlm_lock *lock = *lockp;
/* ofd_dlm.c */
extern struct kmem_cache *ldlm_glimpse_work_kmem;
-int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
- void *req_cookie, enum ldlm_mode mode, __u64 flags,
- void *data);
+int ofd_intent_policy(const struct lu_env *env, struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp, void *req_cookie,
+ enum ldlm_mode mode, __u64 flags, void *data);
static inline struct ofd_thread_info *ofd_info(const struct lu_env *env)
{
rs = ldlm_resource_get(ns, NULL, &info->fti_resid,
LDLM_EXTENT, 0);
if (!IS_ERR(rs)) {
- ldlm_res_lvbo_update(rs, NULL, 1);
+ ldlm_res_lvbo_update(env, rs, NULL, 1);
ldlm_resource_putref(rs);
}
}
* \retval 0 on successful setup
* \retval negative value on error
*/
-static int ofd_lvbo_init(struct ldlm_resource *res)
+static int ofd_lvbo_init(const struct lu_env *env, struct ldlm_resource *res)
{
struct ost_lvb *lvb;
struct ofd_device *ofd;
struct ofd_object *fo;
struct ofd_thread_info *info;
- struct lu_env env;
int rc = 0;
ENTRY;
+ LASSERT(env);
+ info = ofd_info(env);
LASSERT(res);
LASSERT(mutex_is_locked(&res->lr_lvb_mutex));
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
RETURN(-ENOMEM);
- rc = lu_env_init(&env, LCT_DT_THREAD);
- if (rc)
- RETURN(rc);
-
OBD_ALLOC_PTR(lvb);
if (lvb == NULL)
- GOTO(out_env, rc = -ENOMEM);
+ GOTO(out, rc = -ENOMEM);
res->lr_lvb_data = lvb;
res->lr_lvb_len = sizeof(*lvb);
- info = ofd_info_init(&env, NULL);
ost_fid_from_resid(&info->fti_fid, &res->lr_name,
ofd->ofd_lut.lut_lsd.lsd_osd_index);
- fo = ofd_object_find(&env, ofd, &info->fti_fid);
+ fo = ofd_object_find(env, ofd, &info->fti_fid);
if (IS_ERR(fo))
GOTO(out_lvb, rc = PTR_ERR(fo));
- rc = ofd_attr_get(&env, fo, &info->fti_attr);
+ rc = ofd_attr_get(env, fo, &info->fti_attr);
if (rc)
GOTO(out_obj, rc);
PFID(&info->fti_fid), lvb->lvb_size,
lvb->lvb_mtime, lvb->lvb_blocks);
+ info->fti_attr.la_valid = 0;
+
EXIT;
out_obj:
- ofd_object_put(&env, fo);
+ ofd_object_put(env, fo);
out_lvb:
if (rc != 0)
OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
-out_env:
- lu_env_fini(&env);
+out:
/* Don't free lvb data on lookup error */
return rc;
}
* \retval 0 on successful setup
* \retval negative value on error
*/
-static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
- struct ptlrpc_request *req, int increase_only)
+static int ofd_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
+ struct ldlm_lock *lock, struct ptlrpc_request *req,
+ int increase_only)
{
+ struct ofd_thread_info *info;
struct ofd_device *ofd;
struct ofd_object *fo;
- struct ofd_thread_info *info;
struct ost_lvb *lvb;
- struct lu_env env;
int rc = 0;
ENTRY;
+ LASSERT(env);
+ info = ofd_info(env);
LASSERT(res != NULL);
ofd = ldlm_res_to_ns(res)->ns_lvbp;
LASSERT(ofd != NULL);
- rc = lu_env_init(&env, LCT_DT_THREAD);
- if (rc)
- RETURN(rc);
-
- info = ofd_info_init(&env, NULL);
fid_extract_from_res_name(&info->fti_fid, &res->lr_name);
lvb = res->lr_lvb_data;
if (lvb == NULL) {
CERROR("%s: no LVB data for "DFID"\n",
ofd_name(ofd), PFID(&info->fti_fid));
- GOTO(out_env, rc = 0);
+ GOTO(out, rc = 0);
}
/* Update the LVB from the network message */
/* Update the LVB from the disk inode */
ost_fid_from_resid(&info->fti_fid, &res->lr_name,
ofd->ofd_lut.lut_lsd.lsd_osd_index);
- fo = ofd_object_find(&env, ofd, &info->fti_fid);
+ fo = ofd_object_find(env, ofd, &info->fti_fid);
if (IS_ERR(fo))
- GOTO(out_env, rc = PTR_ERR(fo));
+ GOTO(out, rc = PTR_ERR(fo));
- rc = ofd_attr_get(&env, fo, &info->fti_attr);
+ rc = ofd_attr_get(env, fo, &info->fti_attr);
if (rc)
GOTO(out_obj, rc);
unlock_res(res);
out_obj:
- ofd_object_put(&env, fo);
-out_env:
- lu_env_fini(&env);
+ ofd_object_put(env, fo);
+out:
return rc;
}
*
* \retval size of LVB data written into \a buf buffer
*/
-static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
+static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
+ void *buf, int buflen)
{
struct ldlm_resource *res = lock->l_resource;
int lvb_len;
* to go... deadlock! */
res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0);
if (!IS_ERR(res)) {
- ldlm_res_lvbo_update(res, NULL, 0);
+ ldlm_res_lvbo_update(env, res, NULL, 0);
ldlm_resource_putref(res);
}
}
/* Tell the clients that the object is gone now and that they should
* throw away any cached pages. */
ost_fid_build_resid(fid, &info->fti_resid);
- rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid,
+ rc = ldlm_cli_enqueue_local(env, ofd->ofd_namespace, &info->fti_resid,
LDLM_EXTENT, &policy, LCK_PW, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
NULL, NULL, 0, LVB_T_NONE, NULL, &lockh);
* error or otherwise be interrupted).
* Returns 0 on success or error code otherwise.
*/
-int ptlrpc_set_wait(struct ptlrpc_request_set *set)
+int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- struct list_head *tmp;
- struct ptlrpc_request *req;
- struct l_wait_info lwi;
+ struct list_head *tmp;
+ struct ptlrpc_request *req;
+ struct l_wait_info lwi;
+ struct lu_env _env;
time64_t timeout;
int rc;
ENTRY;
if (list_empty(&set->set_requests))
RETURN(0);
+ /* ideally we want env provide by the caller all the time,
+ * but at the moment that would mean a massive change in
+ * LDLM while benefits would be close to zero, so just
+ * initialize env here for those rare cases */
+ if (!env) {
+ /* XXX: skip on the client side? */
+ rc = lu_env_init(&_env, LCT_DT_THREAD);
+ if (rc)
+ RETURN(rc);
+ env = &_env;
+ }
+
do {
timeout = ptlrpc_set_next_timeout(set);
lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1),
ptlrpc_expired_set, set);
- rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
+ rc = l_wait_event(set->set_waitq,
+ ptlrpc_check_set(env, set), &lwi);
/* LU-769 - if we ignored the signal because it was already
* pending when we started, we need to handle it now or we risk
rc = req->rq_status;
}
+ if (env && env == &_env)
+ lu_env_fini(&_env);
+
RETURN(rc);
}
EXPORT_SYMBOL(ptlrpc_set_wait);
/* for distributed debugging */
lustre_msg_set_status(req->rq_reqmsg, current_pid());
- /* add a ref for the set (see comment in ptlrpc_set_add_req) */
- ptlrpc_request_addref(req);
- ptlrpc_set_add_req(set, req);
- rc = ptlrpc_set_wait(set);
- ptlrpc_set_destroy(set);
+ /* add a ref for the set (see comment in ptlrpc_set_add_req) */
+ ptlrpc_request_addref(req);
+ ptlrpc_set_add_req(set, req);
+ rc = ptlrpc_set_wait(NULL, set);
+ ptlrpc_set_destroy(set);
- RETURN(rc);
+ RETURN(rc);
}
EXPORT_SYMBOL(ptlrpc_queue_wait);
*/
} while (exit < 2);
- /*
- * Wait for inflight requests to drain.
- */
+ /*
+ * Wait for inflight requests to drain.
+ */
if (!list_empty(&set->set_requests))
- ptlrpc_set_wait(set);
+ ptlrpc_set_wait(&env, set);
lu_context_fini(&env.le_ctx);
lu_context_fini(env.le_ses);
/* on success, pack lvb in reply */
lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
lvb_len = ldlm_lvbo_size(*lockp);
- lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+ lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, lvb_len);
if (lvb_len < 0)
GOTO(out, rc = lvb_len);
LASSERT(ns != NULL);
LASSERT(!lustre_handle_is_used(lh));
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy, mode,
+ rc = ldlm_cli_enqueue_local(NULL, ns, res_id, LDLM_IBITS, &policy, mode,
flags, ldlm_blocking_ast,
ldlm_completion_ast, ldlm_glimpse_ast,
NULL, 0, LVB_T_NONE, NULL, lh);
* Helper function for getting server side [start, start+count] DLM lock
* if asked by client.
*/
-int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
- __u64 start, __u64 end, struct lustre_handle *lh,
- int mode, __u64 *flags)
+int tgt_extent_lock(const struct lu_env *env, struct ldlm_namespace *ns,
+ struct ldlm_res_id *res_id, __u64 start, __u64 end,
+ struct lustre_handle *lh, int mode, __u64 *flags)
{
union ldlm_policy_data policy;
int rc;
else
policy.l_extent.end = end | ~PAGE_MASK;
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_EXTENT, &policy, mode,
- flags, ldlm_blocking_ast,
+ rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_EXTENT, &policy,
+ mode, flags, ldlm_blocking_ast,
ldlm_completion_ast, ldlm_glimpse_ast,
NULL, 0, LVB_T_NONE, NULL, lh);
RETURN(rc == ELDLM_OK ? 0 : -EIO);
}
EXPORT_SYMBOL(tgt_extent_unlock);
-static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id,
- struct obd_ioobj *obj, struct niobuf_remote *nb,
- struct lustre_handle *lh, enum ldlm_mode mode)
+static int tgt_brw_lock(const struct lu_env *env, struct obd_export *exp,
+ struct ldlm_res_id *res_id, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, struct lustre_handle *lh,
+ enum ldlm_mode mode)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
__u64 flags = 0;
if (exp->exp_connect_data.ocd_connect_flags & OBD_CONNECT_IBITS)
rc = tgt_mdt_data_lock(ns, res_id, lh, mode, &flags);
else
- rc = tgt_extent_lock(ns, res_id, nb[0].rnb_offset,
+ rc = tgt_extent_lock(env, ns, res_id, nb[0].rnb_offset,
nb[nrbufs - 1].rnb_offset +
nb[nrbufs - 1].rnb_len - 1,
lh, mode, &flags);
local_nb = tbc->local;
- rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
- LCK_PR);
+ rc = tgt_brw_lock(tsi->tsi_env, exp, &tsi->tsi_resid, ioo, remote_nb,
+ &lockh, LCK_PR);
if (rc != 0)
RETURN(rc);
local_nb = tbc->local;
- rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
- LCK_PW);
+ rc = tgt_brw_lock(tsi->tsi_env, exp, &tsi->tsi_resid, ioo, remote_nb,
+ &lockh, LCK_PW);
if (rc != 0)
GOTO(out, rc);