struct ldlm_lock *blwi_lock;
cfs_list_t blwi_head;
int blwi_count;
+ cfs_completion_t blwi_comp;
+ cfs_atomic_t blwi_ref_count;
};
#ifdef __KERNEL__
+static inline void ldlm_bl_work_item_get(struct ldlm_bl_work_item *blwi)
+{
+ cfs_atomic_inc(&blwi->blwi_ref_count);
+}
+
+static inline void ldlm_bl_work_item_put(struct ldlm_bl_work_item *blwi)
+{
+ if (cfs_atomic_dec_and_test(&blwi->blwi_ref_count))
+ OBD_FREE(blwi, sizeof(*blwi));
+}
static inline int have_expired_locks(void)
{
}
#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+static int __ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_bl_work_item *blwi,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- cfs_list_t *cancels, int count)
+ cfs_list_t *cancels, int count, int mode)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- struct ldlm_bl_work_item *blwi;
ENTRY;
- if (cancels && count == 0)
+ if (cancels && count == 0) {
+ if (mode == LDLM_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
RETURN(0);
+ }
- OBD_ALLOC(blwi, sizeof(*blwi));
- if (blwi == NULL)
- RETURN(-ENOMEM);
+ cfs_init_completion(&blwi->blwi_comp);
+ cfs_atomic_set(&blwi->blwi_ref_count, 1);
blwi->blwi_ns = ns;
if (ld != NULL)
} else {
blwi->blwi_lock = lock;
}
+
cfs_spin_lock(&blp->blp_lock);
if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
/* add LDLM_FL_DISCARD_DATA requests to the priority list */
/* other blocking callbacks are added to the regular list */
cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
}
- cfs_waitq_signal(&blp->blp_waitq);
cfs_spin_unlock(&blp->blp_lock);
+ if (mode == LDLM_SYNC) {
+ /* keep ref count as object is on this stack for SYNC call */
+ ldlm_bl_work_item_get(blwi);
+ cfs_waitq_signal(&blp->blp_waitq);
+ cfs_wait_for_completion(&blwi->blwi_comp);
+ } else {
+ cfs_waitq_signal(&blp->blp_waitq);
+ }
+
RETURN(0);
}
+
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+ struct list_head *cancels, int count, int mode)
+{
+ ENTRY;
+
+ if (mode == LDLM_SYNC) {
+ /* if it is synchronous call do minimum mem alloc, as it could
+ * be triggered from kernel shrinker
+ */
+ struct ldlm_bl_work_item blwi;
+ memset(&blwi, 0, sizeof(blwi));
+ /* have extra ref as this obj is on stack */
+ RETURN(__ldlm_bl_to_thread(ns, &blwi, ld, lock, cancels, count, mode));
+ } else {
+ struct ldlm_bl_work_item *blwi;
+ OBD_ALLOC(blwi, sizeof(*blwi));
+ if (blwi == NULL)
+ RETURN(-ENOMEM);
+
+ RETURN(__ldlm_bl_to_thread(ns, blwi, ld, lock, cancels, count, mode));
+ }
+}
+
#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock *lock)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+ RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
#else
RETURN(-ENOSYS);
#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count)
+ cfs_list_t *cancels, int count, int mode)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
+ RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
#else
RETURN(-ENOSYS);
#endif
int rc = -ENOSYS;
ENTRY;
- DEBUG_REQ(D_ERROR, req, "%s: handle setinfo\n", obd->obd_name);
+ DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
return rc;
}
+static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
+ const char *msg, int rc,
+ struct lustre_handle *handle)
+{
+ CWARN("%s: [pid %d] [xid x"LPU64"] [nid %s] [opc %d] [rc %d] "
+ "[lock "LPX64"].\n",
+ msg, lustre_msg_get_status(req->rq_reqmsg),
+ req->rq_xid, libcfs_id2str(req->rq_peer),
+ lustre_msg_get_opc(req->rq_reqmsg), rc,
+ handle ? handle->cookie : 0);
+ if (req->rq_no_reply)
+ CWARN("No reply was sent, maybe cause bug 21636.\n");
+ else if (rc)
+ CWARN("Send reply failed, maybe cause bug 21636.\n");
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- ldlm_callback_reply(req, -ENOTCONN);
+ rc = ldlm_callback_reply(req, -ENOTCONN);
+ ldlm_callback_errmsg(req, "Operate on unconnected server",
+ rc, NULL);
RETURN(0);
}
dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (dlm_req == NULL) {
- ldlm_callback_reply(req, -EPROTO);
+ rc = ldlm_callback_reply(req, -EPROTO);
+ ldlm_callback_errmsg(req, "Operate without parameter", rc,
+ NULL);
RETURN(0);
}
if (!lock) {
CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
"disappeared\n", dlm_req->lock_handle[0].cookie);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
dlm_req->lock_handle[0].cookie);
unlock_res_and_lock(lock);
LDLM_LOCK_RELEASE(lock);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
/* BL_AST locks are not needed in lru.
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
- if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
- ldlm_callback_reply(req, 0);
+ if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
+ rc = ldlm_callback_reply(req, 0);
+ if (req->rq_no_reply || rc)
+ ldlm_callback_errmsg(req, "Normal process", rc,
+ &dlm_req->lock_handle[0]);
+ }
if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
}
- OBD_FREE(blwi, sizeof(*blwi));
+ cfs_complete(&blwi->blwi_comp);
+ ldlm_bl_work_item_put(blwi);
}
cfs_atomic_dec(&blp->blp_busy_threads);
void ldlm_destroy_export(struct obd_export *exp)
{
ENTRY;
- cfs_hash_destroy(exp->exp_lock_hash);
+ cfs_hash_putref(exp->exp_lock_hash);
exp->exp_lock_hash = NULL;
EXIT;
}