From: shaver Date: Wed, 21 Aug 2002 20:49:11 +0000 (+0000) Subject: * Add timeouts for blocking-AST callbacks. X-Git-Tag: 0.5.5~99 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=dd50b8c635c35eca49972974c02eafe0d03a3eb6;p=fs%2Flustre-release.git * Add timeouts for blocking-AST callbacks. * Add fail_loc support for dropping a blocking AST reply or callback on the OSC. --- diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 5c3413c..cd18fa2 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -119,6 +119,9 @@ struct ldlm_lock { struct list_head l_childof; struct list_head l_res_link; /*position in one of three res lists*/ struct list_head l_inode_link; /* position in inode info list */ + struct list_head l_export_chain; /* per-export chain of locks */ + struct list_head l_pending_chain; /* locks with callbacks pending */ + unsigned long l_callback_timeout; ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index e9cd118..c7d1328 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -93,6 +93,8 @@ extern char obd_recovery_upcall[128]; #define OBD_FAIL_OSC 0x400 #define OBD_FAIL_OSC_BRW_READ_BULK 0x401 #define OBD_FAIL_OSC_BRW_WRITE_BULK 0x402 +#define OBD_FAIL_OSC_LOCK_BL_AST 0x403 +#define OBD_FAIL_OSC_LOCK_BL_REPLY 0x404 /* preparation for a more advanced failure testbed (not functional yet) */ #define OBD_FAIL_MASK_SYS 0x0000FF00 diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index fea73d7..c2f9f25 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -226,6 +226,8 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, INIT_LIST_HEAD(&lock->l_children); INIT_LIST_HEAD(&lock->l_res_link); INIT_LIST_HEAD(&lock->l_inode_link); + INIT_LIST_HEAD(&lock->l_export_chain); + INIT_LIST_HEAD(&lock->l_pending_chain); init_waitqueue_head(&lock->l_waitq); if (parent != NULL) { diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index f62bb91..4d687a3 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -35,6 +35,93 @@ extern struct list_head ldlm_namespace_list; extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req); extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req); +inline unsigned long round_timeout(unsigned long timeout) +{ + return ((timeout / HZ) + 1) * HZ; +} + +static void waiting_locks_callback(unsigned long unused) +{ + CERROR("lock(s) expired! need to start recovery!\n"); +} + +static struct list_head waiting_locks_list; +static spinlock_t waiting_locks_spinlock; +static struct timer_list waiting_locks_timer; +/* + * Indicate that we're waiting for a client to call us back cancelling a given + * lock. We add it to the pending-callback chain, and schedule the lock-timeout + * timer to fire appropriately. (We round up to the next second, to avoid floods + * of timer firings during periods of high lock contention and traffic. + */ +int ldlm_add_waiting_lock(struct ldlm_lock *lock) +{ + unsigned long timeout_rounded; + ENTRY; + + LASSERT(list_empty(&lock->l_pending_chain)); + + CERROR("waiting for lock %p\n", lock); + spin_lock_bh(&waiting_locks_spinlock); + lock->l_callback_timeout = jiffies + (obd_timeout * HZ); + + timeout_rounded = round_timeout(lock->l_callback_timeout); + + if (timeout_rounded < waiting_locks_timer.expires || + !timer_pending(&waiting_locks_timer)) { + CERROR("adjusting timer! (%0lx/%0lx < %0lx)\n", + lock->l_callback_timeout, timeout_rounded, + waiting_locks_timer.expires); + mod_timer(&waiting_locks_timer, timeout_rounded); + } + list_add(&lock->l_pending_chain, waiting_locks_list.prev); /* FIFO */ + spin_unlock_bh(&waiting_locks_spinlock); + RETURN(1); +} + +/* + * Remove a lock from the pending list, likely because it had its cancellation + * callback arrive without incident. This adjusts the lock-timeout timer if + * needed. Returns 0 if the lock wasn't pending after all, 1 if it was. + */ +int ldlm_del_waiting_lock(struct ldlm_lock *lock) +{ + struct list_head *list_next; + + ENTRY; + + spin_lock_bh(&waiting_locks_spinlock); + + if (list_empty(&lock->l_pending_chain)) { + spin_unlock_bh(&waiting_locks_spinlock); + RETURN(0); + } + + CERROR("no longer waiting for lock %p\n", lock); + + list_next = lock->l_pending_chain.next; + if (lock->l_pending_chain.prev == &waiting_locks_list) { + /* Removing the head of the list, adjust timer. */ + if (list_next == &waiting_locks_list) { + /* No more, just cancel. */ + CERROR("no more locks waiting, stopping timer\n"); + del_timer(&waiting_locks_timer); + } else { + struct ldlm_lock *next; + next = list_entry(list_next, struct ldlm_lock, + l_pending_chain); + CERROR("adjusting timer (%0lx %0lx)\n", + waiting_locks_timer.expires, + round_timeout(next->l_callback_timeout)); + mod_timer(&waiting_locks_timer, + round_timeout(next->l_callback_timeout)); + } + } + list_del_init(&lock->l_pending_chain); + spin_unlock_bh(&waiting_locks_spinlock); + RETURN(1); +} + static int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, __u32 data_len) @@ -59,6 +146,7 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); req->rq_replen = lustre_msg_size(0, NULL); + ldlm_add_waiting_lock(lock); rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); ptlrpc_free_req(req); @@ -211,6 +299,8 @@ int ldlm_handle_convert(struct ptlrpc_request *req) LDLM_DEBUG(lock, "server-side convert handler START"); ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_flags); + if (ldlm_del_waiting_lock(lock)) + CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock); req->rq_status = 0; } if (ptlrpc_reply(req->rq_svc, req) != 0) @@ -249,6 +339,8 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) } else { LDLM_DEBUG(lock, "server-side cancel handler START"); ldlm_lock_cancel(lock); + if (ldlm_del_waiting_lock(lock)) + CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock); req->rq_status = 0; } @@ -271,6 +363,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req) int rc, do_ast; ENTRY; + OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_REPLY, 0); + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(-ENOMEM); @@ -278,6 +372,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req) if (rc) RETURN(rc); + OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_AST, 0); + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); lock = ldlm_handle2lock(&dlm_req->lock_handle1); @@ -480,6 +576,12 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) } } + INIT_LIST_HEAD(&waiting_locks_list); + spin_lock_init(&waiting_locks_spinlock); + waiting_locks_timer.function = waiting_locks_callback; + waiting_locks_timer.data = 0; + init_timer(&waiting_locks_timer); + RETURN(0); out_thread: