cont = 0;
LDLM_LOCK_GET(lock);
+
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "prolong the busy lock");
ldlm_refresh_waiting_lock(lock,
spin_lock_bh(&waiting_locks_spinlock);
if (!cont) {
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
break;
}
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
continue;
}
lock->l_resource->lr_namespace->ns_timeouts++;
total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
lock->l_last_activity);
- if (total_enqueue_wait > obd_timeout)
- /* non-fatal with AT - change to LDLM_DEBUG? */
- LDLM_WARN(lock, "enqueue wait took %lus from "CFS_TIME_T,
- total_enqueue_wait, lock->l_last_activity);
-
req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
&RQF_LDLM_CP_CALLBACK);
if (req == NULL)
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
- at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
- total_enqueue_wait);
+ if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
+ at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
+ total_enqueue_wait);
+ else
+ /* bz18618. Don't add lock enqueue time we spend waiting for a
+ previous callback to fail. Locks waiting legitimately will
+ get extended by ldlm_refresh_waiting_lock regardless of the
+ estimate, so it's okay to underestimate here. */
+ LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
+ "It is likely that a previous callback timed out.",
+ total_enqueue_wait,
+ at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
ptlrpc_request_set_replen(req);
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- struct ldlm_request *dlm_req;
-
- CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
- "export cookie "LPX64"; this is "
- "normal if this node rebooted with a lock held\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer),
- lustre_msg_get_handle(req->rq_reqmsg)->cookie);
-
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL)
- CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
- dlm_req->lock_handle[0].cookie);
-
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, 1800,
+ LDLM_CB_REPLY_PORTAL, 2,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
ldlm_state->ldlm_cancel_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, 6000,
+ LDLM_CANCEL_REPLY_PORTAL, 6,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
rc = cfs_mem_cache_destroy(ldlm_resource_slab);
LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
+#ifdef __KERNEL__
+ /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
+ * synchronize_rcu() to wait a grace period elapsed, so that
+ * ldlm_lock_free() get a chance to be called. */
+ synchronize_rcu();
+#endif
rc = cfs_mem_cache_destroy(ldlm_lock_slab);
LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
rc = cfs_mem_cache_destroy(ldlm_interval_slab);