/* release extra ref grabbed by
* ldlm_add_waiting_lock() or
* ldlm_failed_ast() */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
continue;
}
export = class_export_get(lock->l_export);
/* release extra ref grabbed by ldlm_add_waiting_lock()
* or ldlm_failed_ast() */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
do_dump++;
class_fail_export(export);
static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
+/**
+ * Check if there is a request in the export request list
+ * which prevents the lock canceling.
+ */
+static int ldlm_lock_busy(struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ int match = 0;
+ ENTRY;
+
+ if (lock->l_export == NULL)
+ return 0;
+
+ spin_lock(&lock->l_export->exp_lock);
+ list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ if (req->rq_ops->hpreq_lock_match) {
+ match = req->rq_ops->hpreq_lock_match(req, lock);
+ if (match)
+ break;
+ }
+ }
+ spin_unlock(&lock->l_export->exp_lock);
+ RETURN(match);
+}
+
/* This is called from within a timer interrupt and cannot schedule */
static void waiting_locks_callback(unsigned long unused)
{
while (!list_empty(&waiting_locks_list)) {
lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
-
if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) ||
(lock->l_req_mode == LCK_GROUP))
break;
goto repeat;
}
+ /* Check if we need to prolong timeout */
+ if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+ ldlm_lock_busy(lock)) {
+ int cont = 1;
+
+ if (lock->l_pending_chain.next == &waiting_locks_list)
+ cont = 0;
+
+ LDLM_LOCK_GET(lock);
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_DEBUG(lock, "prolong the busy lock");
+ ldlm_refresh_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
+ spin_lock_bh(&waiting_locks_spinlock);
+
+ if (!cont) {
+ LDLM_LOCK_PUT(lock);
+ break;
+ }
+
+ LDLM_LOCK_PUT(lock);
+ continue;
+ }
+ lock->l_resource->lr_namespace->ns_timeouts++;
LDLM_ERROR(lock, "lock callback timer expired after %lds: "
"evicting client at %s ",
- cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
+ cfs_time_current_sec()- lock->l_last_activity,
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
*
* Called with the namespace lock held.
*/
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
{
- int timeout;
+ cfs_time_t timeout;
cfs_time_t timeout_rounded;
if (!list_empty(&lock->l_pending_chain))
return 0;
- timeout = ldlm_get_enq_timeout(lock);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
+ OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+ seconds = 1;
- lock->l_callback_timeout = cfs_time_shift(timeout);
+ timeout = cfs_time_shift(seconds);
+ if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
+ lock->l_callback_timeout = timeout;
timeout_rounded = round_timeout(lock->l_callback_timeout);
return 0;
}
- ret = __ldlm_add_waiting_lock(lock);
+ ret = __ldlm_add_waiting_lock(lock, ldlm_get_enq_timeout(lock));
if (ret)
/* grab ref on the lock if it has been added to the
* waiting list */
if (ret)
/* release lock ref if it has indeed been removed
* from a list */
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
*
* Called with namespace lock held.
*/
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
{
if (lock->l_export == NULL) {
/* We don't have a "waiting locks list" on clients. */
/* we remove/add the lock to the waiting list, so no needs to
* release/take a lock reference */
__ldlm_del_waiting_lock(lock);
- __ldlm_add_waiting_lock(lock);
+ __ldlm_add_waiting_lock(lock, timeout);
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "refreshed");
return 1;
}
-
#else /* !__KERNEL__ */
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
RETURN(0);
}
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
{
RETURN(0);
}
RETURN(rc);
}
+/**
+ * Check if there are requests in the export request list which prevent
+ * the lock canceling and make these requests high priority ones.
+ */
+static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ ENTRY;
+
+ if (lock->l_export == NULL) {
+ LDLM_DEBUG(lock, "client lock: no-op");
+ RETURN_EXIT;
+ }
+
+ spin_lock(&lock->l_export->exp_lock);
+ list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+ req->rq_ops->hpreq_lock_match(req, lock))
+ ptlrpc_hpreq_reorder(req);
+ }
+ spin_unlock(&lock->l_export->exp_lock);
+ EXIT;
+}
+
/*
* ->l_blocking_ast() method for server-side locks. This is invoked when newly
* enqueued server lock conflicts with given one.
ldlm_lock_dump(D_ERROR, lock, 0);
}
+ ldlm_lock_reorder_req(lock);
+
req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
&RQF_LDLM_BL_CALLBACK,
LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct timeval granted_time;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
LASSERT(lock != NULL);
LASSERT(data != NULL);
- do_gettimeofday(&granted_time);
- total_enqueue_wait = cfs_timeval_sub(&granted_time,
- &lock->l_enqueued_time, NULL);
+ total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
+ lock->l_last_activity);
- if (total_enqueue_wait / ONE_MILLION > obd_timeout)
+ if (total_enqueue_wait > obd_timeout)
/* non-fatal with AT - change to LDLM_DEBUG? */
- LDLM_ERROR(lock, "enqueue wait took %luus from "CFS_TIME_T,
- total_enqueue_wait, lock->l_enqueued_time.tv_sec);
+ LDLM_WARN(lock, "enqueue wait took %lus from "CFS_TIME_T,
+ total_enqueue_wait, lock->l_last_activity);
req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
&RQF_LDLM_CP_CALLBACK);
unlock_res_and_lock(lock);
}
- LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
+ LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
total_enqueue_wait);
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
- total_enqueue_wait / ONE_MILLION);
+ total_enqueue_wait);
ptlrpc_request_set_replen(req);
lock_res_and_lock(lock);
if (lock->l_flags & LDLM_FL_AST_SENT) {
body->lock_flags |= LDLM_FL_AST_SENT;
+ /* copy ast flags like LDLM_FL_DISCARD_DATA */
+ body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
/* We might get here prior to ldlm_handle_enqueue setting
* LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
if (!lock)
GOTO(out, rc = -ENOMEM);
- do_gettimeofday(&lock->l_enqueued_time);
+ lock->l_last_activity = cfs_time_current_sec();
lock->l_remote_handle = dlm_req->lock_handle[0];
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
!(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
CERROR("Granting sync lock to libclient. "
- "req fl %d, rep fl %d, lock fl %d\n",
+ "req fl %d, rep fl %d, lock fl "LPX64"\n",
dlm_req->lock_flags, dlm_rep->lock_flags,
lock->l_flags);
LDLM_ERROR(lock, "sync lock");
LDLM_DEBUG(lock, "server-side convert handler START");
- do_gettimeofday(&lock->l_enqueued_time);
+ lock->l_last_activity = cfs_time_current_sec();
res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
if (res) {
RETURN(0);
break;
case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
+ CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
RETURN(0);
RETURN(0);
}
+ if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
+ lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
+ OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+
/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
lock_res_and_lock(lock);
lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- /* If somebody cancels locks and cache is already droped,
+ /* If somebody cancels lock and cache is already droped,
+ * or lock is failed before cp_ast received on client,
* we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache. */
- if ((lock->l_flags & LDLM_FL_CANCELING) &&
- (lock->l_flags & LDLM_FL_BL_DONE)) {
+ if (((lock->l_flags & LDLM_FL_CANCELING) &&
+ (lock->l_flags & LDLM_FL_BL_DONE)) ||
+ (lock->l_flags & LDLM_FL_FAILED)) {
LDLM_DEBUG(lock, "callback on lock "
LPX64" - lock disappeared\n",
dlm_req->lock_handle[0].cookie);
LASSERT(!lock->l_blocking_lock);
lock->l_flags |= LDLM_FL_AST_SENT;
- if (lock->l_export && lock->l_export->exp_lock_hash)
+ if (lock->l_export && lock->l_export->exp_lock_hash &&
+ !hlist_unhashed(&lock->l_exp_hash))
lustre_hash_del(lock->l_export->exp_lock_hash,
&lock->l_remote_handle, &lock->l_exp_hash);
list_add_tail(&lock->l_rk_ast, rpc_list);
exp->exp_lock_hash =
lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
- 128, 65536, &ldlm_export_lock_ops, LH_REHASH);
+ 7, 16, &ldlm_export_lock_ops, LH_REHASH);
if (!exp->exp_lock_hash)
RETURN(-ENOMEM);
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
"ldlm_cb",
- LCT_MD_THREAD|LCT_DT_THREAD);
+ LCT_MD_THREAD|LCT_DT_THREAD, NULL);
if (!ldlm_state->ldlm_cb_service) {
CERROR("failed to start service\n");
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
"ldlm_cn",
- LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD);
+ LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
+ NULL);
if (!ldlm_state->ldlm_cancel_service) {
CERROR("failed to start service\n");
EXPORT_SYMBOL(__ldlm_handle2lock);
EXPORT_SYMBOL(ldlm_lock_get);
EXPORT_SYMBOL(ldlm_lock_put);
-EXPORT_SYMBOL(ldlm_lock_fast_match);
-EXPORT_SYMBOL(ldlm_lock_fast_release);
EXPORT_SYMBOL(ldlm_lock_match);
EXPORT_SYMBOL(ldlm_lock_cancel);
EXPORT_SYMBOL(ldlm_lock_addref);
EXPORT_SYMBOL(ldlm_lock_decref);
EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
EXPORT_SYMBOL(ldlm_lock_change_resource);
-EXPORT_SYMBOL(ldlm_lock_set_data);
EXPORT_SYMBOL(ldlm_it2str);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
EXPORT_SYMBOL(ldlm_lock_allow_match);
EXPORT_SYMBOL(ldlm_lock_downgrade);
EXPORT_SYMBOL(ldlm_lock_convert);