cfs_list_t blwi_head;
int blwi_count;
struct completion blwi_comp;
- int blwi_mode;
+ ldlm_cancel_flags_t blwi_flags;
int blwi_mem_pressure;
};
*/
static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
{
- struct ptlrpc_request *req;
- ENTRY;
+ struct ptlrpc_request *req;
+ ENTRY;
- if (lock->l_export == NULL) {
- LDLM_DEBUG(lock, "client lock: no-op");
- RETURN_EXIT;
- }
+ if (lock->l_export == NULL) {
+ LDLM_DEBUG(lock, "client lock: no-op");
+ RETURN_EXIT;
+ }
spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
- rq_exp_list) {
- /* Do not process requests that were not yet added to there
- * incoming queue or were already removed from there for
- * processing */
- if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
- req->rq_ops->hpreq_lock_match &&
- req->rq_ops->hpreq_lock_match(req, lock))
- ptlrpc_hpreq_reorder(req);
- }
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ rq_exp_list) {
+ /* Do not process requests that were not yet added to there
+ * incoming queue or were already removed from there for
+ * processing. We evaluate ptlrpc_nrs_req_can_move() without
+ * holding svcpt->scp_req_lock, and then redo the check with
+ * the lock held once we need to obtain a reliable result.
+ */
+ if (ptlrpc_nrs_req_can_move(req) &&
+ req->rq_ops->hpreq_lock_match &&
+ req->rq_ops->hpreq_lock_match(req, lock))
+ ptlrpc_nrs_req_hp_move(req);
+ }
spin_unlock_bh(&lock->l_export->exp_rpc_lock);
EXIT;
}
void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
- req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
- lvb_len, RCL_CLIENT);
+ if (lvb_len < 0) {
+ /* We still need to send the RPC to wake up the blocked
+ * enqueue thread on the client.
+ *
+ * Consider old client, there is no better way to notify
+ * the failure, just zero-sized the LVB, then the client
+ * will fail out as "-EPROTO". */
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
+ RCL_CLIENT);
+ instant_cancel = 1;
+ } else {
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
+ RCL_CLIENT);
+ }
}
LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
- RETURN(rc);
+ RETURN(lvb_len < 0 ? lvb_len : rc);
}
EXPORT_SYMBOL(ldlm_server_completion_ast);
GOTO(out, rc = -EFAULT);
}
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
+ if (exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) {
if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
LDLM_PLAIN)) {
DEBUG_REQ(D_ERROR, req,
/* INODEBITS_INTEROP: Perform conversion from plain lock to
* inodebits lock if client does not support them. */
- if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
+ if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) &&
(dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
buflen = req_capsule_get_size(&req->rq_pill,
&RMF_DLM_LVB, RCL_SERVER);
buflen = ldlm_lvbo_fill(lock, buf, buflen);
- req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
- buflen, RCL_SERVER);
+ if (buflen >= 0)
+ req_capsule_shrink(&req->rq_pill,
+ &RMF_DLM_LVB,
+ buflen, RCL_SERVER);
+ else
+ rc = buflen;
}
} else {
lock_res_and_lock(lock);
}
#ifdef __KERNEL__
-static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
+static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
+ ldlm_cancel_flags_t cancel_flags)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
ENTRY;
cfs_waitq_signal(&blp->blp_waitq);
- /* can not use blwi->blwi_mode as blwi could be already freed in
- LDLM_ASYNC mode */
- if (mode == LDLM_SYNC)
+ /* can not check blwi->blwi_flags as blwi could be already freed in
+ LCF_ASYNC mode */
+ if (!(cancel_flags & LCF_ASYNC))
wait_for_completion(&blwi->blwi_comp);
RETURN(0);
}
static inline void init_blwi(struct ldlm_bl_work_item *blwi,
- struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count,
- struct ldlm_lock *lock,
- int mode)
+ struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld,
+ cfs_list_t *cancels, int count,
+ struct ldlm_lock *lock,
+ ldlm_cancel_flags_t cancel_flags)
{
init_completion(&blwi->blwi_comp);
CFS_INIT_LIST_HEAD(&blwi->blwi_head);
blwi->blwi_mem_pressure = 1;
blwi->blwi_ns = ns;
- blwi->blwi_mode = mode;
+ blwi->blwi_flags = cancel_flags;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
* call ->l_blocking_ast itself.
*/
static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- cfs_list_t *cancels, int count, int mode)
+ struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock,
+ cfs_list_t *cancels, int count,
+ ldlm_cancel_flags_t cancel_flags)
{
- ENTRY;
+ ENTRY;
- if (cancels && count == 0)
- RETURN(0);
+ if (cancels && count == 0)
+ RETURN(0);
- if (mode == LDLM_SYNC) {
- /* if it is synchronous call do minimum mem alloc, as it could
- * be triggered from kernel shrinker
- */
- struct ldlm_bl_work_item blwi;
- memset(&blwi, 0, sizeof(blwi));
- init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
- RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
- } else {
- struct ldlm_bl_work_item *blwi;
- OBD_ALLOC(blwi, sizeof(*blwi));
- if (blwi == NULL)
- RETURN(-ENOMEM);
- init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
+ if (cancel_flags & LCF_ASYNC) {
+ struct ldlm_bl_work_item *blwi;
- RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
- }
+ OBD_ALLOC(blwi, sizeof(*blwi));
+ if (blwi == NULL)
+ RETURN(-ENOMEM);
+ init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
+
+ RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
+ } else {
+ /* if it is synchronous call do minimum mem alloc, as it could
+ * be triggered from kernel shrinker
+ */
+ struct ldlm_bl_work_item blwi;
+
+ memset(&blwi, 0, sizeof(blwi));
+ init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
+ RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
+ }
}
#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct ldlm_lock *lock)
+ struct ldlm_lock *lock)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
+ return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
#else
- RETURN(-ENOSYS);
+ return -ENOSYS;
#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count, int mode)
+ cfs_list_t *cancels, int count,
+ ldlm_cancel_flags_t cancel_flags)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
+ return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
#else
- RETURN(-ENOSYS);
+ return -ENOSYS;
#endif
}
* which the server has already started a blocking callback on. */
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- rc = ldlm_cli_cancel(&dlm_req->lock_handle[0]);
+ rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
if (rc < 0)
CERROR("ldlm_cli_cancel: %d\n", rc);
}
count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
blwi->blwi_count,
LCF_BL_AST);
- ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+ blwi->blwi_flags);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
if (blwi->blwi_mem_pressure)
cfs_memory_pressure_clr();
- if (blwi->blwi_mode == LDLM_ASYNC)
- OBD_FREE(blwi, sizeof(*blwi));
- else
+ if (blwi->blwi_flags & LCF_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
complete(&blwi->blwi_comp);
}
.psc_name = "ldlm_cbd",
.psc_watchdog_factor = 2,
.psc_buf = {
- .bc_nbufs = LDLM_NBUFS,
+ .bc_nbufs = LDLM_CLIENT_NBUFS,
.bc_buf_size = LDLM_BUFSIZE,
.bc_req_max_size = LDLM_MAXREQSIZE,
.bc_rep_max_size = LDLM_MAXREPSIZE,
.psc_name = "ldlm_canceld",
.psc_watchdog_factor = 6,
.psc_buf = {
- .bc_nbufs = LDLM_NBUFS,
+ .bc_nbufs = LDLM_SERVER_NBUFS,
.bc_buf_size = LDLM_BUFSIZE,
.bc_req_max_size = LDLM_MAXREQSIZE,
.bc_rep_max_size = LDLM_MAXREPSIZE,