* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
*/
/*
* This file is part of Lustre, http://www.lustre.org/
static cfs_semaphore_t ldlm_ref_sem;
static int ldlm_refcount;
+struct ldlm_cb_async_args {
+ struct ldlm_cb_set_arg *ca_set_arg;
+ struct ldlm_lock *ca_lock;
+};
+
/* LDLM state */
static struct ldlm_state *ldlm_state;
struct ldlm_lock *blwi_lock;
cfs_list_t blwi_head;
int blwi_count;
+ cfs_completion_t blwi_comp;
+ int blwi_mode;
+ int blwi_mem_pressure;
};
#ifdef __KERNEL__
if (lock->l_export == NULL)
return 0;
- cfs_spin_lock(&lock->l_export->exp_lock);
+ cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
rq_exp_list) {
if (req->rq_ops->hpreq_lock_match) {
break;
}
}
- cfs_spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
RETURN(match);
}
/* This is called from within a timer interrupt and cannot schedule */
static void waiting_locks_callback(unsigned long unused)
{
- struct ldlm_lock *lock, *last = NULL;
+ struct ldlm_lock *lock;
repeat:
cfs_spin_lock_bh(&waiting_locks_spinlock);
LDLM_LOCK_RELEASE(lock);
continue;
}
- lock->l_resource->lr_namespace->ns_timeouts++;
+ ldlm_lock_to_ns(lock)->ns_timeouts++;
LDLM_ERROR(lock, "lock callback timer expired after %lds: "
"evicting client at %s ",
cfs_time_current_sec()- lock->l_last_activity,
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
- last = lock;
-
/* no needs to take an extra ref on the lock since it was in
* the waiting_locks_list and ldlm_add_waiting_lock()
* already grabbed a ref */
ldlm_failed_ast(lock, rc, ast_type);
}
} else if (rc) {
- if (rc == -EINVAL)
+ if (rc == -EINVAL) {
+ struct ldlm_resource *res = lock->l_resource;
LDLM_DEBUG(lock, "client (nid %s) returned %d"
" from %s AST - normal race",
libcfs_nid2str(peer.nid),
req->rq_repmsg ?
lustre_msg_get_status(req->rq_repmsg) : -1,
ast_type);
- else
+ if (res) {
+ /* update lvbo to return proper attributes.
+ * see bug 23174 */
+ ldlm_resource_getref(res);
+ ldlm_res_lvbo_update(res, NULL, 1);
+ ldlm_resource_putref(res);
+ }
+
+ } else {
LDLM_ERROR(lock, "client (nid %s) returned %d "
"from %s AST", libcfs_nid2str(peer.nid),
(req->rq_repmsg != NULL) ?
lustre_msg_get_status(req->rq_repmsg) : 0,
ast_type);
+ }
ldlm_lock_cancel(lock);
/* Server-side AST functions are called from ldlm_reprocess_all,
* which needs to be told to please restart its reprocessing. */
static int ldlm_cb_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
- struct ldlm_cb_set_arg *arg;
- struct ldlm_lock *lock;
+ struct ldlm_cb_async_args *ca = data;
+ struct ldlm_lock *lock = ca->ca_lock;
+ struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
ENTRY;
- LASSERT(data != NULL);
-
- arg = req->rq_async_args.pointer_arg[0];
- lock = req->rq_async_args.pointer_arg[1];
LASSERT(lock != NULL);
if (rc != 0) {
- /* If client canceled the lock but the cancel has not
- * been received yet, we need to update lvbo to have the
- * proper attributes cached. */
- if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
- ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
rc = ldlm_handle_ast_error(lock, req, rc,
arg->type == LDLM_BL_CALLBACK
? "blocking" : "completion");
+ if (rc == -ERESTART)
+ cfs_atomic_inc(&arg->restart);
}
-
LDLM_LOCK_RELEASE(lock);
- if (rc == -ERESTART)
- cfs_atomic_set(&arg->restart, 1);
-
+ if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+ cfs_waitq_signal(&arg->waitq);
RETURN(0);
}
-static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
struct ldlm_cb_set_arg *arg,
struct ldlm_lock *lock,
int instant_cancel)
rc = ptl_send_rpc(req, 1);
ptlrpc_req_finished(req);
if (rc == 0)
- /* If we cancelled the lock, we need to restart
- * ldlm_reprocess_queue */
- cfs_atomic_set(&arg->restart, 1);
+ cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
- ptlrpc_set_add_req(arg->set, req);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ cfs_atomic_inc(&arg->rpcs);
}
RETURN(rc);
RETURN_EXIT;
}
- cfs_spin_lock(&lock->l_export->exp_lock);
+ cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
rq_exp_list) {
if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
req->rq_ops->hpreq_lock_match(req, lock))
ptlrpc_hpreq_reorder(req);
}
- cfs_spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
EXIT;
}
struct ldlm_lock_desc *desc,
void *data, int flag)
{
+ struct ldlm_cb_async_args *ca;
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
if (req == NULL)
RETURN(-ENOMEM);
- req->rq_async_args.pointer_arg[0] = arg;
- req->rq_async_args.pointer_arg[1] = lock;
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
req->rq_interpret_reply = ldlm_cb_interpret;
req->rq_no_resend = 1;
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
RETURN(rc);
}
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
+ struct ldlm_cb_async_args *ca;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
if (req == NULL)
RETURN(-ENOMEM);
- lock_res_and_lock(lock);
- if (lock->l_resource->lr_lvb_len)
+ /* server namespace, doesn't need lock */
+ if (lock->l_resource->lr_lvb_len) {
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
lock->l_resource->lr_lvb_len);
- unlock_res_and_lock(lock);
+ }
rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
if (rc) {
RETURN(rc);
}
- req->rq_async_args.pointer_arg[0] = arg;
- req->rq_async_args.pointer_arg[1] = lock;
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
req->rq_interpret_reply = ldlm_cb_interpret;
req->rq_no_resend = 1;
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (lock->l_resource->lr_lvb_len) {
void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
- lock_res_and_lock(lock);
+ lock_res(lock->l_resource);
memcpy(lvb, lock->l_resource->lr_lvb_data,
lock->l_resource->lr_lvb_len);
- unlock_res_and_lock(lock);
+ unlock_res(lock->l_resource);
}
LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
- at_measured(&lock->l_resource->lr_namespace->ns_at_estimate,
+ at_measured(ldlm_lock_to_ns_at(lock),
total_enqueue_wait);
else
/* bz18618. Don't add lock enqueue time we spend waiting for a
LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
"It is likely that a previous callback timed out.",
total_enqueue_wait,
- at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
+ at_get(ldlm_lock_to_ns_at(lock)));
ptlrpc_request_set_replen(req);
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
RETURN(rc);
}
body->lock_handle[0] = lock->l_remote_handle;
ldlm_lock2desc(lock, &body->lock_desc);
- lock_res_and_lock(lock);
+ /* server namespace, doesn't need lock */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
lock->l_resource->lr_lvb_len);
- unlock_res_and_lock(lock);
res = lock->l_resource;
ptlrpc_request_set_replen(req);
LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
rc = ptlrpc_queue_wait(req);
- if (rc == -ELDLM_NO_LOCK_DATA)
+ /* Update the LVB from disk if the AST failed (this is a legal race)
+ *
+ * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
+ * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
+ * when inode is cleared. LU-274
+ */
+ if (rc == -ELDLM_NO_LOCK_DATA) {
LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
- else if (rc != 0)
+ ldlm_res_lvbo_update(res, NULL, 1);
+ } else if (rc != 0) {
rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
- else
+ } else {
rc = ldlm_res_lvbo_update(res, req, 1);
+ }
ptlrpc_req_finished(req);
if (rc == -ERESTART)
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
- /* Don't enqueue a lock onto the export if it has already
- * been evicted. Cancel it now instead. (bug 3822) */
- if (req->rq_export->exp_failed) {
- LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
+ /* Don't enqueue a lock onto the export if it is been disonnected
+ * due to eviction (bug 3822) or server umount (bug 24324).
+ * Cancel it now instead. */
+ if (req->rq_export->exp_disconnected) {
+ LDLM_ERROR(lock, "lock on disconnected export %p",
+ req->rq_export);
GOTO(out, rc = -ENOTCONN);
}
* local_lock_enqueue by the policy function. */
cookie = req;
} else {
- lock_res_and_lock(lock);
+ /* based on the assumption that lvb size never changes during
+ * resource life time otherwise it need resource->lr_lock's
+ * protection */
if (lock->l_resource->lr_lvb_len) {
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER,
lock->l_resource->lr_lvb_len);
}
- unlock_res_and_lock(lock);
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
}
if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
- lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+ ldlm_convert_policy_to_local(
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
lock->l_req_extent = lock->l_policy_data.l_extent;
dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
- /* Don't move a pending lock onto the export if it has already
- * been evicted. Cancel it now instead. (bug 5683) */
- if (unlikely(req->rq_export->exp_failed ||
+ /* Don't move a pending lock onto the export if it has already been
+ * disconnected due to eviction (bug 5683) or server umount (bug 24324).
+ * Cancel it now instead. */
+ if (unlikely(req->rq_export->exp_disconnected ||
OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
rc = -ENOTCONN;
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
"(err=%d, rc=%d)", err, rc);
- lock_res_and_lock(lock);
if (rc == 0) {
if (lock->l_resource->lr_lvb_len > 0) {
+ /* MDT path won't handle lr_lvb_data, so
+ * lock/unlock better be contained in the
+ * if block */
void *lvb;
lvb = req_capsule_server_get(&req->rq_pill,
&RMF_DLM_LVB);
LASSERTF(lvb != NULL, "req %p, lock %p\n",
req, lock);
-
+ lock_res(lock->l_resource);
memcpy(lvb, lock->l_resource->lr_lvb_data,
lock->l_resource->lr_lvb_len);
+ unlock_res(lock->l_resource);
}
} else {
+ lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy_nolock(lock);
+ unlock_res_and_lock(lock);
}
- unlock_res_and_lock(lock);
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
if (!ldlm_request_cancel(req, dlm_req, 0))
req->rq_status = ESTALE;
- if (ptlrpc_reply(req) != 0)
- LBUG();
-
- RETURN(0);
+ RETURN(ptlrpc_reply(req));
}
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
}
if (lock->l_resource->lr_type != LDLM_PLAIN) {
- lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+ ldlm_convert_policy_to_local(
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
LDLM_DEBUG(lock, "completion AST, new policy data");
}
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
- ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
+ /* Let Enqueue to call osc_lock_upcall() and initialize
+ * l_ast_data */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
+
+ ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
lock);
}
#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- cfs_list_t *cancels, int count)
+static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- struct ldlm_bl_work_item *blwi;
ENTRY;
- if (cancels && count == 0)
- RETURN(0);
+ cfs_spin_lock(&blp->blp_lock);
+ if (blwi->blwi_lock && blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
+ /* add LDLM_FL_DISCARD_DATA requests to the priority list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+ } else {
+ /* other blocking callbacks are added to the regular list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+ }
+ cfs_spin_unlock(&blp->blp_lock);
- OBD_ALLOC(blwi, sizeof(*blwi));
- if (blwi == NULL)
- RETURN(-ENOMEM);
+ cfs_waitq_signal(&blp->blp_waitq);
+
+ /* can not use blwi->blwi_mode as blwi could be already freed in
+ LDLM_ASYNC mode */
+ if (mode == LDLM_SYNC)
+ cfs_wait_for_completion(&blwi->blwi_comp);
+
+ RETURN(0);
+}
+
+static inline void init_blwi(struct ldlm_bl_work_item *blwi,
+ struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld,
+ cfs_list_t *cancels, int count,
+ struct ldlm_lock *lock,
+ int mode)
+{
+ cfs_init_completion(&blwi->blwi_comp);
+ CFS_INIT_LIST_HEAD(&blwi->blwi_head);
+
+ if (cfs_memory_pressure_get())
+ blwi->blwi_mem_pressure = 1;
blwi->blwi_ns = ns;
+ blwi->blwi_mode = mode;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
} else {
blwi->blwi_lock = lock;
}
- cfs_spin_lock(&blp->blp_lock);
- if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
- /* add LDLM_FL_DISCARD_DATA requests to the priority list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+}
+
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+ cfs_list_t *cancels, int count, int mode)
+{
+ ENTRY;
+
+ if (cancels && count == 0)
+ RETURN(0);
+
+ if (mode == LDLM_SYNC) {
+ /* if it is synchronous call do minimum mem alloc, as it could
+ * be triggered from kernel shrinker
+ */
+ struct ldlm_bl_work_item blwi;
+ memset(&blwi, 0, sizeof(blwi));
+ init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
+ RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
} else {
- /* other blocking callbacks are added to the regular list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
- }
- cfs_waitq_signal(&blp->blp_waitq);
- cfs_spin_unlock(&blp->blp_lock);
+ struct ldlm_bl_work_item *blwi;
+ OBD_ALLOC(blwi, sizeof(*blwi));
+ if (blwi == NULL)
+ RETURN(-ENOMEM);
+ init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
- RETURN(0);
+ RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
+ }
}
+
#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock *lock)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+ RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
#else
RETURN(-ENOSYS);
#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count)
+ cfs_list_t *cancels, int count, int mode)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
+ RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
#else
RETURN(-ENOSYS);
#endif
}
static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
- const char *msg, int rc)
+ const char *msg, int rc,
+ struct lustre_handle *handle)
{
- CWARN("%s: [pid %d] [xid x"LPU64"] [nid %s] [opc %d] [rc %d].\n",
- msg, lustre_msg_get_status(req->rq_reqmsg),
- req->rq_xid, libcfs_id2str(req->rq_peer),
- lustre_msg_get_opc(req->rq_reqmsg), rc);
+ DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
+ "%s: [nid %s] [rc %d] [lock "LPX64"]",
+ msg, libcfs_id2str(req->rq_peer), rc,
+ handle ? handle->cookie : 0);
if (req->rq_no_reply)
CWARN("No reply was sent, maybe cause bug 21636.\n");
else if (rc)
if (req->rq_export == NULL) {
rc = ldlm_callback_reply(req, -ENOTCONN);
- ldlm_callback_errmsg(req, "Operate on unconnected server", rc);
+ ldlm_callback_errmsg(req, "Operate on unconnected server",
+ rc, NULL);
RETURN(0);
}
dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (dlm_req == NULL) {
rc = ldlm_callback_reply(req, -EPROTO);
- ldlm_callback_errmsg(req, "Operate without parameter", rc);
+ ldlm_callback_errmsg(req, "Operate without parameter", rc,
+ NULL);
RETURN(0);
}
CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
"disappeared\n", dlm_req->lock_handle[0].cookie);
rc = ldlm_callback_reply(req, -EINVAL);
- ldlm_callback_errmsg(req, "Operate with invalid parameter", rc);
+ ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
unlock_res_and_lock(lock);
LDLM_LOCK_RELEASE(lock);
rc = ldlm_callback_reply(req, -EINVAL);
- ldlm_callback_errmsg(req, "Operate on stale lock", rc);
+ ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
/* BL_AST locks are not needed in lru.
if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
rc = ldlm_callback_reply(req, 0);
if (req->rq_no_reply || rc)
- ldlm_callback_errmsg(req, "Normal process", rc);
+ ldlm_callback_errmsg(req, "Normal process", rc,
+ &dlm_req->lock_handle[0]);
}
if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
if (req->rq_export == NULL) {
struct ldlm_request *dlm_req;
- CERROR("operation %d from %s with bad export cookie "LPU64"\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer),
+ CERROR("%s from %s arrived at %lu with bad export cookie "
+ LPU64"\n",
+ ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
+ libcfs_nid2str(req->rq_peer.nid),
+ req->rq_arrival_time.tv_sec,
lustre_msg_get_handle(req->rq_reqmsg)->cookie);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
RETURN(0);
}
-void ldlm_revoke_lock_cb(void *obj, void *data)
+int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+ cfs_hlist_node_t *hnode, void *data)
+
{
cfs_list_t *rpc_list = data;
- struct ldlm_lock *lock = obj;
+ struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
lock_res_and_lock(lock);
if (lock->l_req_mode != lock->l_granted_mode) {
unlock_res_and_lock(lock);
- return;
+ return 0;
}
LASSERT(lock->l_resource);
if (lock->l_resource->lr_type != LDLM_IBITS &&
lock->l_resource->lr_type != LDLM_PLAIN) {
unlock_res_and_lock(lock);
- return;
+ return 0;
}
if (lock->l_flags & LDLM_FL_AST_SENT) {
unlock_res_and_lock(lock);
- return;
+ return 0;
}
LASSERT(lock->l_blocking_ast);
LDLM_LOCK_GET(lock);
unlock_res_and_lock(lock);
+ return 0;
}
void ldlm_revoke_export_locks(struct obd_export *exp)
CFS_INIT_LIST_HEAD(&rpc_list);
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_revoke_lock_cb, &rpc_list);
- ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
+ ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+ LDLM_WORK_REVOKE_AST);
EXIT;
}
int rc;
cfs_init_completion(&bltd.bltd_comp);
- rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+ rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
if (rc < 0) {
CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
cfs_atomic_read(&blp->blp_num_threads), rc);
while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
+ int busy;
blwi = ldlm_bl_get_work(blp);
if (blwi == NULL) {
- int busy;
-
cfs_atomic_dec(&blp->blp_busy_threads);
l_wait_event_exclusive(blp->blp_waitq,
(blwi = ldlm_bl_get_work(blp)) != NULL,
&lwi);
busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
-
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
-
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= cfs_atomic_read(&blp->blp_num_threads)))
- /* discard the return value, we tried */
- ldlm_bl_thread_start(blp);
} else {
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
+ busy = cfs_atomic_read(&blp->blp_busy_threads);
}
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+
+ /* Not fatal if racy and have a few too many threads */
+ if (unlikely(busy < blp->blp_max_threads &&
+ busy >= cfs_atomic_read(&blp->blp_num_threads) &&
+ !blwi->blwi_mem_pressure))
+ /* discard the return value, we tried */
+ ldlm_bl_thread_start(blp);
+
+ if (blwi->blwi_mem_pressure)
+ cfs_memory_pressure_set();
+
if (blwi->blwi_count) {
+ int count;
/* The special case when we cancel locks in lru
* asynchronously, we pass the list of locks here.
- * Thus lock is marked LDLM_FL_CANCELING, and already
- * canceled locally. */
- ldlm_cli_cancel_list(&blwi->blwi_head,
- blwi->blwi_count, NULL, 0);
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count,
+ LCF_BL_AST);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
}
- OBD_FREE(blwi, sizeof(*blwi));
+ if (blwi->blwi_mem_pressure)
+ cfs_memory_pressure_clr();
+
+ if (blwi->blwi_mode == LDLM_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ cfs_complete(&blwi->blwi_comp);
}
cfs_atomic_dec(&blp->blp_busy_threads);
* Export handle<->lock hash operations.
*/
static unsigned
-ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask)
+ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
{
return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
}
ldlm_export_lock_key(cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
- ENTRY;
lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
- RETURN(&lock->l_remote_handle);
+ return &lock->l_remote_handle;
+}
+
+static void
+ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key)
+{
+ struct ldlm_lock *lock;
+
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock->l_remote_handle = *(struct lustre_handle *)key;
}
static int
-ldlm_export_lock_compare(void *key, cfs_hlist_node_t *hnode)
+ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode)
{
- ENTRY;
- RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key));
+ return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
}
static void *
-ldlm_export_lock_get(cfs_hlist_node_t *hnode)
+ldlm_export_lock_object(cfs_hlist_node_t *hnode)
+{
+ return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+}
+
+static void
+ldlm_export_lock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
- ENTRY;
lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_GET(lock);
-
- RETURN(lock);
}
-static void *
-ldlm_export_lock_put(cfs_hlist_node_t *hnode)
+static void
+ldlm_export_lock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
- ENTRY;
lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_RELEASE(lock);
-
- RETURN(lock);
}
static cfs_hash_ops_t ldlm_export_lock_ops = {
- .hs_hash = ldlm_export_lock_hash,
- .hs_key = ldlm_export_lock_key,
- .hs_compare = ldlm_export_lock_compare,
- .hs_get = ldlm_export_lock_get,
- .hs_put = ldlm_export_lock_put
+ .hs_hash = ldlm_export_lock_hash,
+ .hs_key = ldlm_export_lock_key,
+ .hs_keycmp = ldlm_export_lock_keycmp,
+ .hs_keycpy = ldlm_export_lock_keycpy,
+ .hs_object = ldlm_export_lock_object,
+ .hs_get = ldlm_export_lock_get,
+ .hs_put = ldlm_export_lock_put,
+ .hs_put_locked = ldlm_export_lock_put,
};
int ldlm_init_export(struct obd_export *exp)
exp->exp_lock_hash =
cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
- HASH_EXP_LOCK_CUR_BITS, HASH_EXP_LOCK_MAX_BITS,
- &ldlm_export_lock_ops, CFS_HASH_REHASH);
+ HASH_EXP_LOCK_CUR_BITS,
+ HASH_EXP_LOCK_MAX_BITS,
+ HASH_EXP_LOCK_BKT_BITS, 0,
+ CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
+ &ldlm_export_lock_ops,
+ CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
+ CFS_HASH_NBLK_CHANGE);
if (!exp->exp_lock_hash)
RETURN(-ENOMEM);
void ldlm_destroy_export(struct obd_export *exp)
{
ENTRY;
- cfs_hash_destroy(exp->exp_lock_hash);
+ cfs_hash_putref(exp->exp_lock_hash);
exp->exp_lock_hash = NULL;
EXIT;
}
GOTO(out_thread, rc);
}
- rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);
+ rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service);
if (rc)
GOTO(out_thread, rc);
- rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service);
+ rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service);
if (rc)
GOTO(out_thread, rc);
cfs_spin_lock_init(&waiting_locks_spinlock);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
- rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
+ rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
if (rc < 0) {
CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
GOTO(out_thread, rc);
RETURN(0);
}
-int __init ldlm_init(void)
+int ldlm_init(void)
{
cfs_init_mutex(&ldlm_ref_sem);
cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
return 0;
}
-void __exit ldlm_exit(void)
+void ldlm_exit(void)
{
int rc;
if (ldlm_refcount)
EXPORT_SYMBOL(ldlm_replay_locks);
EXPORT_SYMBOL(ldlm_resource_foreach);
EXPORT_SYMBOL(ldlm_namespace_foreach);
-EXPORT_SYMBOL(ldlm_namespace_foreach_res);
EXPORT_SYMBOL(ldlm_resource_iterate);
EXPORT_SYMBOL(ldlm_cancel_resource_local);
+EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
EXPORT_SYMBOL(ldlm_cli_cancel_list);
/* ldlm_lockd.c */