/* release extra ref grabbed by
* ldlm_add_waiting_lock() or
* ldlm_failed_ast() */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
continue;
}
export = class_export_get(lock->l_export);
/* release extra ref grabbed by ldlm_add_waiting_lock()
* or ldlm_failed_ast() */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
do_dump++;
class_fail_export(export);
static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
+/**
+ * Check if there is a request in the export request list
+ * which prevents the lock canceling.
+ */
+static int ldlm_lock_busy(struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ int match = 0;
+ ENTRY;
+
+ if (lock->l_export == NULL)
+ return 0;
+
+ spin_lock(&lock->l_export->exp_lock);
+ list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ if (req->rq_ops->hpreq_lock_match) {
+ match = req->rq_ops->hpreq_lock_match(req, lock);
+ if (match)
+ break;
+ }
+ }
+ spin_unlock(&lock->l_export->exp_lock);
+ RETURN(match);
+}
+
/* This is called from within a timer interrupt and cannot schedule */
static void waiting_locks_callback(unsigned long unused)
{
while (!list_empty(&waiting_locks_list)) {
lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
-
if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) ||
(lock->l_req_mode == LCK_GROUP))
break;
goto repeat;
}
+ /* Check if we need to prolong timeout */
+ if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+ ldlm_lock_busy(lock)) {
+ int cont = 1;
+
+ if (lock->l_pending_chain.next == &waiting_locks_list)
+ cont = 0;
+
+ LDLM_LOCK_GET(lock);
+
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_DEBUG(lock, "prolong the busy lock");
+ ldlm_refresh_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
+ spin_lock_bh(&waiting_locks_spinlock);
+
+ if (!cont) {
+ LDLM_LOCK_RELEASE(lock);
+ break;
+ }
+
+ LDLM_LOCK_RELEASE(lock);
+ continue;
+ }
+ lock->l_resource->lr_namespace->ns_timeouts++;
LDLM_ERROR(lock, "lock callback timer expired after %lds: "
"evicting client at %s ",
- cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
+ cfs_time_current_sec()- lock->l_last_activity,
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
*
* Called with the namespace lock held.
*/
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
{
- int timeout;
+ cfs_time_t timeout;
cfs_time_t timeout_rounded;
if (!list_empty(&lock->l_pending_chain))
return 0;
- timeout = ldlm_get_enq_timeout(lock);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
+ OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+ seconds = 1;
- lock->l_callback_timeout = cfs_time_shift(timeout);
+ timeout = cfs_time_shift(seconds);
+ if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
+ lock->l_callback_timeout = timeout;
timeout_rounded = round_timeout(lock->l_callback_timeout);
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
int ret;
+ int timeout = ldlm_get_enq_timeout(lock);
LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
return 0;
}
- ret = __ldlm_add_waiting_lock(lock);
+ ret = __ldlm_add_waiting_lock(lock, timeout);
if (ret)
/* grab ref on the lock if it has been added to the
* waiting list */
LDLM_LOCK_GET(lock);
spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_DEBUG(lock, "%sadding to wait list",
- ret == 0 ? "not re-" : "");
+ LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
+ ret == 0 ? "not re-" : "", timeout,
+ AT_OFF ? "off" : "on");
return ret;
}
if (ret)
/* release lock ref if it has indeed been removed
* from a list */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
*
* Called with namespace lock held.
*/
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
{
if (lock->l_export == NULL) {
/* We don't have a "waiting locks list" on clients. */
/* we remove/add the lock to the waiting list, so no needs to
* release/take a lock reference */
__ldlm_del_waiting_lock(lock);
- __ldlm_add_waiting_lock(lock);
+ __ldlm_add_waiting_lock(lock, timeout);
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "refreshed");
return 1;
}
-
#else /* !__KERNEL__ */
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
RETURN(0);
}
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
{
RETURN(0);
}
RETURN(rc);
}
+/**
+ * Check if there are requests in the export request list which prevent
+ * the lock canceling and make these requests high priority ones.
+ */
+static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ ENTRY;
+
+ if (lock->l_export == NULL) {
+ LDLM_DEBUG(lock, "client lock: no-op");
+ RETURN_EXIT;
+ }
+
+ spin_lock(&lock->l_export->exp_lock);
+ list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+ req->rq_ops->hpreq_lock_match(req, lock))
+ ptlrpc_hpreq_reorder(req);
+ }
+ spin_unlock(&lock->l_export->exp_lock);
+ EXIT;
+}
+
/*
* ->l_blocking_ast() method for server-side locks. This is invoked when newly
* enqueued server lock conflicts with given one.
ldlm_lock_dump(D_ERROR, lock, 0);
}
+ ldlm_lock_reorder_req(lock);
+
req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
&RQF_LDLM_BL_CALLBACK,
LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct timeval granted_time;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
LASSERT(lock != NULL);
LASSERT(data != NULL);
- do_gettimeofday(&granted_time);
- total_enqueue_wait = cfs_timeval_sub(&granted_time,
- &lock->l_enqueued_time, NULL);
-
- if (total_enqueue_wait / ONE_MILLION > obd_timeout)
- /* non-fatal with AT - change to LDLM_DEBUG? */
- LDLM_ERROR(lock, "enqueue wait took %luus from "CFS_TIME_T,
- total_enqueue_wait, lock->l_enqueued_time.tv_sec);
+ total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
+ lock->l_last_activity);
req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
&RQF_LDLM_CP_CALLBACK);
unlock_res_and_lock(lock);
}
- LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
+ LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
total_enqueue_wait);
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
- at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
- total_enqueue_wait / ONE_MILLION);
+ if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
+ at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
+ total_enqueue_wait);
+ else
+ /* bz18618. Don't add lock enqueue time we spend waiting for a
+ previous callback to fail. Locks waiting legitimately will
+ get extended by ldlm_refresh_waiting_lock regardless of the
+ estimate, so it's okay to underestimate here. */
+ LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
+ "It is likely that a previous callback timed out.",
+ total_enqueue_wait,
+ at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
ptlrpc_request_set_replen(req);
lock_res_and_lock(lock);
if (lock->l_flags & LDLM_FL_AST_SENT) {
body->lock_flags |= LDLM_FL_AST_SENT;
+ /* copy ast flags like LDLM_FL_DISCARD_DATA */
+ body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
/* We might get here prior to ldlm_handle_enqueue setting
* LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
if (!lock)
GOTO(out, rc = -ENOMEM);
- do_gettimeofday(&lock->l_enqueued_time);
+ lock->l_last_activity = cfs_time_current_sec();
lock->l_remote_handle = dlm_req->lock_handle[0];
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
!(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
CERROR("Granting sync lock to libclient. "
- "req fl %d, rep fl %d, lock fl %d\n",
+ "req fl %d, rep fl %d, lock fl "LPX64"\n",
dlm_req->lock_flags, dlm_rep->lock_flags,
lock->l_flags);
LDLM_ERROR(lock, "sync lock");
LDLM_DEBUG(lock, "server-side convert handler START");
- do_gettimeofday(&lock->l_enqueued_time);
+ lock->l_last_activity = cfs_time_current_sec();
res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
if (res) {
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- struct ldlm_request *dlm_req;
-
- CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
- "export cookie "LPX64"; this is "
- "normal if this node rebooted with a lock held\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer),
- lustre_msg_get_handle(req->rq_reqmsg)->cookie);
-
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL)
- CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
- dlm_req->lock_handle[0].cookie);
-
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
RETURN(0);
break;
case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
+ CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
RETURN(0);
RETURN(0);
}
+ if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
+ lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
+ OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+
/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
lock_res_and_lock(lock);
lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- /* If somebody cancels locks and cache is already droped,
+ /* If somebody cancels lock and cache is already droped,
+ * or lock is failed before cp_ast received on client,
* we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache. */
- if ((lock->l_flags & LDLM_FL_CANCELING) &&
- (lock->l_flags & LDLM_FL_BL_DONE)) {
+ if (((lock->l_flags & LDLM_FL_CANCELING) &&
+ (lock->l_flags & LDLM_FL_BL_DONE)) ||
+ (lock->l_flags & LDLM_FL_FAILED)) {
LDLM_DEBUG(lock, "callback on lock "
LPX64" - lock disappeared\n",
dlm_req->lock_handle[0].cookie);
LASSERT(!lock->l_blocking_lock);
lock->l_flags |= LDLM_FL_AST_SENT;
- if (lock->l_export && lock->l_export->exp_lock_hash)
+ if (lock->l_export && lock->l_export->exp_lock_hash &&
+ !hlist_unhashed(&lock->l_exp_hash))
lustre_hash_del(lock->l_export->exp_lock_hash,
&lock->l_remote_handle, &lock->l_exp_hash);
list_add_tail(&lock->l_rk_ast, rpc_list);
exp->exp_lock_hash =
lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
- 128, 65536, &ldlm_export_lock_ops, LH_REHASH);
+ 7, 16, &ldlm_export_lock_ops, LH_REHASH);
if (!exp->exp_lock_hash)
RETURN(-ENOMEM);
ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, 1800,
+ LDLM_CB_REPLY_PORTAL, 2,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
"ldlm_cb",
- LCT_MD_THREAD|LCT_DT_THREAD);
+ LCT_MD_THREAD|LCT_DT_THREAD, NULL);
if (!ldlm_state->ldlm_cb_service) {
CERROR("failed to start service\n");
ldlm_state->ldlm_cancel_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, 6000,
+ LDLM_CANCEL_REPLY_PORTAL, 6,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
"ldlm_cn",
- LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD);
+ LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
+ NULL);
if (!ldlm_state->ldlm_cancel_service) {
CERROR("failed to start service\n");
return -ENOMEM;
ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
- sizeof(struct ldlm_lock), 0,
- SLAB_HWCACHE_ALIGN);
+ sizeof(struct ldlm_lock), 0,
+ SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
if (ldlm_lock_slab == NULL) {
cfs_mem_cache_destroy(ldlm_resource_slab);
return -ENOMEM;
CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
rc = cfs_mem_cache_destroy(ldlm_resource_slab);
LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
+#ifdef __KERNEL__
+ /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
+ * synchronize_rcu() to wait a grace period elapsed, so that
+ * ldlm_lock_free() get a chance to be called. */
+ synchronize_rcu();
+#endif
rc = cfs_mem_cache_destroy(ldlm_lock_slab);
LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
rc = cfs_mem_cache_destroy(ldlm_interval_slab);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
EXPORT_SYMBOL(ldlm_lock_allow_match);
EXPORT_SYMBOL(ldlm_lock_downgrade);
EXPORT_SYMBOL(ldlm_lock_convert);
EXPORT_SYMBOL(client_obd_cleanup);
EXPORT_SYMBOL(client_connect_import);
EXPORT_SYMBOL(client_disconnect_export);
-EXPORT_SYMBOL(target_start_recovery_thread);
EXPORT_SYMBOL(target_stop_recovery_thread);
EXPORT_SYMBOL(target_handle_connect);
EXPORT_SYMBOL(target_cleanup_recovery);