extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
+inline unsigned long round_timeout(unsigned long timeout)
+{
+ return ((timeout / HZ) + 1) * HZ;
+}
+
+static void waiting_locks_callback(unsigned long unused)
+{
+ CERROR("lock(s) expired! need to start recovery!\n");
+}
+
+static struct list_head waiting_locks_list;
+static spinlock_t waiting_locks_spinlock;
+static struct timer_list waiting_locks_timer;
+/*
+ * Indicate that we're waiting for a client to call us back cancelling a given
+ * lock. We add it to the pending-callback chain, and schedule the lock-timeout
+ * timer to fire appropriately. (We round up to the next second, to avoid floods
+ * of timer firings during periods of high lock contention and traffic.
+ */
+int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+{
+ unsigned long timeout_rounded;
+ ENTRY;
+
+ LASSERT(list_empty(&lock->l_pending_chain));
+
+ CERROR("waiting for lock %p\n", lock);
+ spin_lock_bh(&waiting_locks_spinlock);
+ lock->l_callback_timeout = jiffies + (obd_timeout * HZ);
+
+ timeout_rounded = round_timeout(lock->l_callback_timeout);
+
+ if (timeout_rounded < waiting_locks_timer.expires ||
+ !timer_pending(&waiting_locks_timer)) {
+ CERROR("adjusting timer! (%0lx/%0lx < %0lx)\n",
+ lock->l_callback_timeout, timeout_rounded,
+ waiting_locks_timer.expires);
+ mod_timer(&waiting_locks_timer, timeout_rounded);
+ }
+ list_add(&lock->l_pending_chain, waiting_locks_list.prev); /* FIFO */
+ spin_unlock_bh(&waiting_locks_spinlock);
+ RETURN(1);
+}
+
+/*
+ * Remove a lock from the pending list, likely because it had its cancellation
+ * callback arrive without incident. This adjusts the lock-timeout timer if
+ * needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
+ */
+int ldlm_del_waiting_lock(struct ldlm_lock *lock)
+{
+ struct list_head *list_next;
+
+ ENTRY;
+
+ spin_lock_bh(&waiting_locks_spinlock);
+
+ if (list_empty(&lock->l_pending_chain)) {
+ spin_unlock_bh(&waiting_locks_spinlock);
+ RETURN(0);
+ }
+
+ CERROR("no longer waiting for lock %p\n", lock);
+
+ list_next = lock->l_pending_chain.next;
+ if (lock->l_pending_chain.prev == &waiting_locks_list) {
+ /* Removing the head of the list, adjust timer. */
+ if (list_next == &waiting_locks_list) {
+ /* No more, just cancel. */
+ CERROR("no more locks waiting, stopping timer\n");
+ del_timer(&waiting_locks_timer);
+ } else {
+ struct ldlm_lock *next;
+ next = list_entry(list_next, struct ldlm_lock,
+ l_pending_chain);
+ CERROR("adjusting timer (%0lx %0lx)\n",
+ waiting_locks_timer.expires,
+ round_timeout(next->l_callback_timeout));
+ mod_timer(&waiting_locks_timer,
+ round_timeout(next->l_callback_timeout));
+ }
+ }
+ list_del_init(&lock->l_pending_chain);
+ spin_unlock_bh(&waiting_locks_spinlock);
+ RETURN(1);
+}
+
static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
struct ldlm_lock_desc *desc,
void *data, __u32 data_len)
LDLM_DEBUG(lock, "server preparing blocking AST");
req->rq_replen = lustre_msg_size(0, NULL);
+ ldlm_add_waiting_lock(lock);
rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
ptlrpc_free_req(req);
LDLM_DEBUG(lock, "server-side convert handler START");
ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
+ if (ldlm_del_waiting_lock(lock))
+ CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
req->rq_status = 0;
}
if (ptlrpc_reply(req->rq_svc, req) != 0)
} else {
LDLM_DEBUG(lock, "server-side cancel handler START");
ldlm_lock_cancel(lock);
+ if (ldlm_del_waiting_lock(lock))
+ CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
req->rq_status = 0;
}
int rc, do_ast;
ENTRY;
+ OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_REPLY, 0);
+
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(-ENOMEM);
if (rc)
RETURN(rc);
+ OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_AST, 0);
+
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
}
}
+ INIT_LIST_HEAD(&waiting_locks_list);
+ spin_lock_init(&waiting_locks_spinlock);
+ waiting_locks_timer.function = waiting_locks_callback;
+ waiting_locks_timer.data = 0;
+ init_timer(&waiting_locks_timer);
+
RETURN(0);
out_thread: