/* release extra ref grabbed by
* ldlm_add_waiting_lock() or
* ldlm_failed_ast() */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
continue;
}
export = class_export_get(lock->l_export);
/* release extra ref grabbed by ldlm_add_waiting_lock()
* or ldlm_failed_ast() */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
do_dump++;
class_fail_export(export);
cont = 0;
LDLM_LOCK_GET(lock);
+
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "prolong the busy lock");
- ldlm_refresh_waiting_lock(lock);
+ ldlm_refresh_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
spin_lock_bh(&waiting_locks_spinlock);
if (!cont) {
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
break;
}
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
continue;
}
lock->l_resource->lr_namespace->ns_timeouts++;
LDLM_ERROR(lock, "lock callback timer expired after %lds: "
"evicting client at %s ",
- cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
+ cfs_time_current_sec()- lock->l_last_activity,
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
*
* Called with the namespace lock held.
*/
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
{
cfs_time_t timeout;
cfs_time_t timeout_rounded;
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
- timeout = 2;
- else
- timeout = ldlm_get_enq_timeout(lock);
+ seconds = 1;
- timeout = cfs_time_shift(timeout);
+ timeout = cfs_time_shift(seconds);
if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
lock->l_callback_timeout = timeout;
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
int ret;
+ int timeout = ldlm_get_enq_timeout(lock);
LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
return 0;
}
- ret = __ldlm_add_waiting_lock(lock);
+ ret = __ldlm_add_waiting_lock(lock, timeout);
if (ret)
/* grab ref on the lock if it has been added to the
* waiting list */
LDLM_LOCK_GET(lock);
spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_DEBUG(lock, "%sadding to wait list",
- ret == 0 ? "not re-" : "");
+ LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
+ ret == 0 ? "not re-" : "", timeout,
+ AT_OFF ? "off" : "on");
return ret;
}
if (ret)
/* release lock ref if it has indeed been removed
* from a list */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
*
* Called with namespace lock held.
*/
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
{
if (lock->l_export == NULL) {
/* We don't have a "waiting locks list" on clients. */
/* we remove/add the lock to the waiting list, so no needs to
* release/take a lock reference */
__ldlm_del_waiting_lock(lock);
- __ldlm_add_waiting_lock(lock);
+ __ldlm_add_waiting_lock(lock, timeout);
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "refreshed");
RETURN(0);
}
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
{
RETURN(0);
}
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct timeval granted_time;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
LASSERT(lock != NULL);
LASSERT(data != NULL);
- do_gettimeofday(&granted_time);
- total_enqueue_wait = cfs_timeval_sub(&granted_time,
- &lock->l_enqueued_time, NULL);
-
- if (total_enqueue_wait / ONE_MILLION > obd_timeout)
- /* non-fatal with AT - change to LDLM_DEBUG? */
- LDLM_ERROR(lock, "enqueue wait took %luus from "CFS_TIME_T,
- total_enqueue_wait, lock->l_enqueued_time.tv_sec);
+ total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
+ lock->l_last_activity);
req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
&RQF_LDLM_CP_CALLBACK);
unlock_res_and_lock(lock);
}
- LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
+ LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
total_enqueue_wait);
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
- at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
- total_enqueue_wait / ONE_MILLION);
+ if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
+ at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
+ total_enqueue_wait);
+ else
+ /* bz18618. Don't add lock enqueue time we spend waiting for a
+ previous callback to fail. Locks waiting legitimately will
+ get extended by ldlm_refresh_waiting_lock regardless of the
+ estimate, so it's okay to underestimate here. */
+ LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
+ "It is likely that a previous callback timed out.",
+ total_enqueue_wait,
+ at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
ptlrpc_request_set_replen(req);
lock_res_and_lock(lock);
if (lock->l_flags & LDLM_FL_AST_SENT) {
body->lock_flags |= LDLM_FL_AST_SENT;
+ /* copy ast flags like LDLM_FL_DISCARD_DATA */
+ body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
/* We might get here prior to ldlm_handle_enqueue setting
* LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
if (!lock)
GOTO(out, rc = -ENOMEM);
- do_gettimeofday(&lock->l_enqueued_time);
+ lock->l_last_activity = cfs_time_current_sec();
lock->l_remote_handle = dlm_req->lock_handle[0];
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
!(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
CERROR("Granting sync lock to libclient. "
- "req fl %d, rep fl %d, lock fl %d\n",
+ "req fl %d, rep fl %d, lock fl "LPX64"\n",
dlm_req->lock_flags, dlm_rep->lock_flags,
lock->l_flags);
LDLM_ERROR(lock, "sync lock");
LDLM_DEBUG(lock, "server-side convert handler START");
- do_gettimeofday(&lock->l_enqueued_time);
+ lock->l_last_activity = cfs_time_current_sec();
res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
if (res) {
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- struct ldlm_request *dlm_req;
-
- CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
- "export cookie "LPX64"; this is "
- "normal if this node rebooted with a lock held\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer),
- lustre_msg_get_handle(req->rq_reqmsg)->cookie);
-
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL)
- CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
- dlm_req->lock_handle[0].cookie);
-
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
RETURN(0);
}
+ if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
+ lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
+ OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+
/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
lock_res_and_lock(lock);
lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- /* If somebody cancels locks and cache is already droped,
+ /* If somebody cancels lock and cache is already droped,
+ * or lock is failed before cp_ast received on client,
* we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache. */
- if ((lock->l_flags & LDLM_FL_CANCELING) &&
- (lock->l_flags & LDLM_FL_BL_DONE)) {
+ if (((lock->l_flags & LDLM_FL_CANCELING) &&
+ (lock->l_flags & LDLM_FL_BL_DONE)) ||
+ (lock->l_flags & LDLM_FL_FAILED)) {
LDLM_DEBUG(lock, "callback on lock "
LPX64" - lock disappeared\n",
dlm_req->lock_handle[0].cookie);
ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, 1800,
+ LDLM_CB_REPLY_PORTAL, 2,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
ldlm_state->ldlm_cancel_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, 6000,
+ LDLM_CANCEL_REPLY_PORTAL, 6,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
return -ENOMEM;
ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
- sizeof(struct ldlm_lock), 0,
- SLAB_HWCACHE_ALIGN);
+ sizeof(struct ldlm_lock), 0,
+ SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
if (ldlm_lock_slab == NULL) {
cfs_mem_cache_destroy(ldlm_resource_slab);
return -ENOMEM;
CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
rc = cfs_mem_cache_destroy(ldlm_resource_slab);
LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
+#ifdef __KERNEL__
+ /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
+ * synchronize_rcu() to wait a grace period elapsed, so that
+ * ldlm_lock_free() get a chance to be called. */
+ synchronize_rcu();
+#endif
rc = cfs_mem_cache_destroy(ldlm_lock_slab);
LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
rc = cfs_mem_cache_destroy(ldlm_interval_slab);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
EXPORT_SYMBOL(ldlm_lock_allow_match);
EXPORT_SYMBOL(ldlm_lock_downgrade);
EXPORT_SYMBOL(ldlm_lock_convert);
EXPORT_SYMBOL(client_obd_cleanup);
EXPORT_SYMBOL(client_connect_import);
EXPORT_SYMBOL(client_disconnect_export);
-EXPORT_SYMBOL(target_start_recovery_thread);
EXPORT_SYMBOL(target_stop_recovery_thread);
EXPORT_SYMBOL(target_handle_connect);
EXPORT_SYMBOL(target_cleanup_recovery);