* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
struct ldlm_lock *blwi_lock;
cfs_list_t blwi_head;
int blwi_count;
+ cfs_completion_t blwi_comp;
+ int blwi_mode;
+ int blwi_mem_pressure;
};
#ifdef __KERNEL__
if (lock->l_export == NULL)
return 0;
- cfs_spin_lock(&lock->l_export->exp_lock);
+ cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
rq_exp_list) {
if (req->rq_ops->hpreq_lock_match) {
break;
}
}
- cfs_spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
RETURN(match);
}
RETURN_EXIT;
}
- cfs_spin_lock(&lock->l_export->exp_lock);
+ cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
rq_exp_list) {
if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
req->rq_ops->hpreq_lock_match(req, lock))
ptlrpc_hpreq_reorder(req);
}
- cfs_spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
EXIT;
}
if (!ldlm_request_cancel(req, dlm_req, 0))
req->rq_status = ESTALE;
- if (ptlrpc_reply(req) != 0)
- LBUG();
-
- RETURN(0);
+ RETURN(ptlrpc_reply(req));
}
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
+ /* Let Enqueue to call osc_lock_upcall() and initialize
+ * l_ast_data */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
+
ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
}
#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- cfs_list_t *cancels, int count)
+static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- struct ldlm_bl_work_item *blwi;
ENTRY;
- if (cancels && count == 0)
- RETURN(0);
+ cfs_spin_lock(&blp->blp_lock);
+ if (blwi->blwi_lock && blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
+ /* add LDLM_FL_DISCARD_DATA requests to the priority list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+ } else {
+ /* other blocking callbacks are added to the regular list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+ }
+ cfs_spin_unlock(&blp->blp_lock);
- OBD_ALLOC(blwi, sizeof(*blwi));
- if (blwi == NULL)
- RETURN(-ENOMEM);
+ cfs_waitq_signal(&blp->blp_waitq);
+
+ /* can not use blwi->blwi_mode as blwi could be already freed in
+ LDLM_ASYNC mode */
+ if (mode == LDLM_SYNC)
+ cfs_wait_for_completion(&blwi->blwi_comp);
+
+ RETURN(0);
+}
+
+static inline void init_blwi(struct ldlm_bl_work_item *blwi,
+ struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld,
+ cfs_list_t *cancels, int count,
+ struct ldlm_lock *lock,
+ int mode)
+{
+ cfs_init_completion(&blwi->blwi_comp);
+ CFS_INIT_LIST_HEAD(&blwi->blwi_head);
+
+ if (cfs_memory_pressure_get())
+ blwi->blwi_mem_pressure = 1;
blwi->blwi_ns = ns;
+ blwi->blwi_mode = mode;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
} else {
blwi->blwi_lock = lock;
}
- cfs_spin_lock(&blp->blp_lock);
- if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
- /* add LDLM_FL_DISCARD_DATA requests to the priority list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+}
+
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+ cfs_list_t *cancels, int count, int mode)
+{
+ ENTRY;
+
+ if (cancels && count == 0)
+ RETURN(0);
+
+ if (mode == LDLM_SYNC) {
+ /* if it is synchronous call do minimum mem alloc, as it could
+ * be triggered from kernel shrinker
+ */
+ struct ldlm_bl_work_item blwi;
+ memset(&blwi, 0, sizeof(blwi));
+ init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
+ RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
} else {
- /* other blocking callbacks are added to the regular list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
- }
- cfs_waitq_signal(&blp->blp_waitq);
- cfs_spin_unlock(&blp->blp_lock);
+ struct ldlm_bl_work_item *blwi;
+ OBD_ALLOC(blwi, sizeof(*blwi));
+ if (blwi == NULL)
+ RETURN(-ENOMEM);
+ init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
- RETURN(0);
+ RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
+ }
}
+
#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock *lock)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+ RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
#else
RETURN(-ENOSYS);
#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count)
+ cfs_list_t *cancels, int count, int mode)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
+ RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
#else
RETURN(-ENOSYS);
#endif
return rc;
}
+static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
+ const char *msg, int rc,
+ struct lustre_handle *handle)
+{
+ DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
+ "%s: [nid %s] [rc %d] [lock "LPX64"]",
+ msg, libcfs_id2str(req->rq_peer), rc,
+ handle ? handle->cookie : 0);
+ if (req->rq_no_reply)
+ CWARN("No reply was sent, maybe cause bug 21636.\n");
+ else if (rc)
+ CWARN("Send reply failed, maybe cause bug 21636.\n");
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- ldlm_callback_reply(req, -ENOTCONN);
+ rc = ldlm_callback_reply(req, -ENOTCONN);
+ ldlm_callback_errmsg(req, "Operate on unconnected server",
+ rc, NULL);
RETURN(0);
}
dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (dlm_req == NULL) {
- ldlm_callback_reply(req, -EPROTO);
+ rc = ldlm_callback_reply(req, -EPROTO);
+ ldlm_callback_errmsg(req, "Operate without parameter", rc,
+ NULL);
RETURN(0);
}
if (!lock) {
CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
"disappeared\n", dlm_req->lock_handle[0].cookie);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
dlm_req->lock_handle[0].cookie);
unlock_res_and_lock(lock);
LDLM_LOCK_RELEASE(lock);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
/* BL_AST locks are not needed in lru.
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
- if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
- ldlm_callback_reply(req, 0);
+ if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
+ rc = ldlm_callback_reply(req, 0);
+ if (req->rq_no_reply || rc)
+ ldlm_callback_errmsg(req, "Normal process", rc,
+ &dlm_req->lock_handle[0]);
+ }
if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
if (req->rq_export == NULL) {
struct ldlm_request *dlm_req;
- CERROR("operation %d from %s with bad export cookie "LPU64"\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer),
+ CERROR("%s from %s arrived at %lu with bad export cookie "
+ LPU64"\n",
+ ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
+ libcfs_nid2str(req->rq_peer.nid),
+ req->rq_arrival_time.tv_sec,
lustre_msg_get_handle(req->rq_reqmsg)->cookie);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
/* added by ldlm_cleanup() */
break;
}
+ if (blwi->blwi_mem_pressure)
+ cfs_memory_pressure_set();
if (blwi->blwi_count) {
+ int count;
/* The special case when we cancel locks in lru
* asynchronously, we pass the list of locks here.
- * Thus lock is marked LDLM_FL_CANCELING, and already
- * canceled locally. */
- ldlm_cli_cancel_list(&blwi->blwi_head,
- blwi->blwi_count, NULL, 0);
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count,
+ LCF_BL_AST);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
}
- OBD_FREE(blwi, sizeof(*blwi));
+ if (blwi->blwi_mem_pressure)
+ cfs_memory_pressure_clr();
+
+ if (blwi->blwi_mode == LDLM_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ cfs_complete(&blwi->blwi_comp);
}
cfs_atomic_dec(&blp->blp_busy_threads);
void ldlm_destroy_export(struct obd_export *exp)
{
ENTRY;
- cfs_hash_destroy(exp->exp_lock_hash);
+ cfs_hash_putref(exp->exp_lock_hash);
exp->exp_lock_hash = NULL;
EXIT;
}
EXPORT_SYMBOL(ldlm_namespace_foreach_res);
EXPORT_SYMBOL(ldlm_resource_iterate);
EXPORT_SYMBOL(ldlm_cancel_resource_local);
+EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
EXPORT_SYMBOL(ldlm_cli_cancel_list);
/* ldlm_lockd.c */