+static DECLARE_MUTEX(ldlm_ref_sem);
+static int ldlm_refcount = 0;
+
+/* LDLM state */
+
+static struct ldlm_state *ldlm_state;
+
+inline unsigned long round_timeout(unsigned long timeout)
+{
+ return ((timeout / HZ) + 1) * HZ;
+}
+
+#ifdef __KERNEL__
+/* XXX should this be per-ldlm? */
+static struct list_head waiting_locks_list;
+static spinlock_t waiting_locks_spinlock;
+static struct timer_list waiting_locks_timer;
+
+static struct expired_lock_thread {
+ wait_queue_head_t elt_waitq;
+ int elt_state;
+ struct list_head elt_expired_locks;
+ spinlock_t elt_lock;
+} expired_lock_thread;
+#endif
+
+#define ELT_STOPPED 0
+#define ELT_READY 1
+#define ELT_TERMINATE 2
+
+struct ldlm_bl_pool {
+ spinlock_t blp_lock;
+ struct list_head blp_list;
+ wait_queue_head_t blp_waitq;
+ atomic_t blp_num_threads;
+ struct completion blp_comp;
+};
+
+struct ldlm_bl_work_item {
+ struct list_head blwi_entry;
+ struct ldlm_namespace *blwi_ns;
+ struct ldlm_lock_desc blwi_ld;
+ struct ldlm_lock *blwi_lock;
+};
+
+#ifdef __KERNEL__
+
+static inline int have_expired_locks(void)
+{
+ int need_to_run;
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+ RETURN(need_to_run);
+}
+
+static int expired_lock_main(void *arg)
+{
+ struct list_head *expired = &expired_lock_thread.elt_expired_locks;
+ struct l_wait_info lwi = { 0 };
+ unsigned long flags;
+
+ ENTRY;
+ lock_kernel();
+ kportal_daemonize("ldlm_elt");
+
+ SIGNAL_MASK_LOCK(current, flags);
+ sigfillset(¤t->blocked);
+ RECALC_SIGPENDING;
+ SIGNAL_MASK_UNLOCK(current, flags);
+
+ unlock_kernel();
+
+ expired_lock_thread.elt_state = ELT_READY;
+ wake_up(&expired_lock_thread.elt_waitq);
+
+ while (1) {
+ struct list_head *tmp, *n, work_list;
+ l_wait_event(expired_lock_thread.elt_waitq,
+ have_expired_locks() ||
+ expired_lock_thread.elt_state == ELT_TERMINATE,
+ &lwi);
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ while (!list_empty(expired)) {
+ struct ldlm_lock *lock;
+
+ list_add(&work_list, expired);
+ list_del_init(expired);
+
+ list_for_each_entry(lock, &work_list, l_pending_chain) {
+ LDLM_DEBUG(lock, "moving to work list");
+ }
+
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+
+ list_for_each_safe(tmp, n, &work_list) {
+ lock = list_entry(tmp, struct ldlm_lock,
+ l_pending_chain);
+ ptlrpc_fail_export(lock->l_export);
+ }
+
+
+ if (!list_empty(&work_list)) {
+ list_for_each_entry(lock, &work_list, l_pending_chain) {
+ LDLM_ERROR(lock, "still on work list!");
+ }
+ }
+ LASSERTF (list_empty(&work_list),
+ "some exports not failed properly\n");
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ }
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+ if (expired_lock_thread.elt_state == ELT_TERMINATE)
+ break;
+ }
+
+ expired_lock_thread.elt_state = ELT_STOPPED;
+ wake_up(&expired_lock_thread.elt_waitq);
+ RETURN(0);
+}
+
+static void waiting_locks_callback(unsigned long unused)
+{
+ struct ldlm_lock *lock;
+ char str[PTL_NALFMT_SIZE];
+
+ spin_lock_bh(&waiting_locks_spinlock);
+ while (!list_empty(&waiting_locks_list)) {
+ lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
+ l_pending_chain);
+
+ if (lock->l_callback_timeout > jiffies)
+ break;
+
+ LDLM_ERROR(lock, "lock callback timer expired: evicting client "
+ "%s@%s nid "LPX64" (%s) ",
+ lock->l_export->exp_client_uuid.uuid,
+ lock->l_export->exp_connection->c_remote_uuid.uuid,
+ lock->l_export->exp_connection->c_peer.peer_nid,
+ portals_nid2str(lock->l_export->exp_connection->c_peer.peer_ni->pni_number,
+ lock->l_export->exp_connection->c_peer.peer_nid,
+ str));
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ list_del(&lock->l_pending_chain);
+ list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+ wake_up(&expired_lock_thread.elt_waitq);
+ }
+
+ /*
+ * Make sure the timer will fire again if we have any locks
+ * left.
+ */
+ if (!list_empty(&waiting_locks_list)) {
+ unsigned long timeout_rounded;
+ lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
+ l_pending_chain);
+ timeout_rounded = round_timeout(lock->l_callback_timeout);
+ mod_timer(&waiting_locks_timer, timeout_rounded);
+ }
+ spin_unlock_bh(&waiting_locks_spinlock);
+}
+
+/*
+ * Indicate that we're waiting for a client to call us back cancelling a given
+ * lock. We add it to the pending-callback chain, and schedule the lock-timeout
+ * timer to fire appropriately. (We round up to the next second, to avoid
+ * floods of timer firings during periods of high lock contention and traffic).
+ */
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+{
+ unsigned long timeout_rounded;
+
+ spin_lock_bh(&waiting_locks_spinlock);
+ if (!list_empty(&lock->l_pending_chain)) {
+ LDLM_DEBUG(lock, "not re-adding to wait list");
+ spin_unlock_bh(&waiting_locks_spinlock);
+ return 0;
+ }
+ LDLM_DEBUG(lock, "adding to wait list");
+
+ lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
+
+ timeout_rounded = round_timeout(lock->l_callback_timeout);
+
+ if (timeout_rounded < waiting_locks_timer.expires ||
+ !timer_pending(&waiting_locks_timer)) {
+ mod_timer(&waiting_locks_timer, timeout_rounded);
+ }
+ list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
+ spin_unlock_bh(&waiting_locks_spinlock);
+ return 1;
+}
+
+/*
+ * Remove a lock from the pending list, likely because it had its cancellation
+ * callback arrive without incident. This adjusts the lock-timeout timer if
+ * needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
+ */
+int ldlm_del_waiting_lock(struct ldlm_lock *lock)
+{
+ struct list_head *list_next;
+
+ if (lock->l_export == NULL) {
+ /* We don't have a "waiting locks list" on clients. */
+ LDLM_DEBUG(lock, "client lock: no-op");
+ return 0;
+ }
+
+ spin_lock_bh(&waiting_locks_spinlock);
+
+ if (list_empty(&lock->l_pending_chain)) {
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_DEBUG(lock, "wasn't waiting");
+ return 0;
+ }
+
+ list_next = lock->l_pending_chain.next;
+ if (lock->l_pending_chain.prev == &waiting_locks_list) {
+ /* Removing the head of the list, adjust timer. */
+ if (list_next == &waiting_locks_list) {
+ /* No more, just cancel. */
+ del_timer(&waiting_locks_timer);
+ } else {
+ struct ldlm_lock *next;
+ next = list_entry(list_next, struct ldlm_lock,
+ l_pending_chain);
+ mod_timer(&waiting_locks_timer,
+ round_timeout(next->l_callback_timeout));
+ }
+ }
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ list_del_init(&lock->l_pending_chain);
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_DEBUG(lock, "removed");
+ return 1;
+}
+
+#else /* !__KERNEL__ */
+
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+{
+ RETURN(1);
+}
+
+int ldlm_del_waiting_lock(struct ldlm_lock *lock)
+{
+ RETURN(0);
+}
+
+#endif /* __KERNEL__ */
+
+static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, char *ast_type)
+{
+ const struct ptlrpc_connection *conn = lock->l_export->exp_connection;
+ char str[PTL_NALFMT_SIZE];
+
+ CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
+ ", mode %s: evicting client %s@%s NID "LPX64" (%s)\n",
+ ast_type, rc,
+ lock->l_resource->lr_name.name[0],
+ lock->l_resource->lr_name.name[1],
+ ldlm_lockname[lock->l_granted_mode],
+ lock->l_export->exp_client_uuid.uuid,
+ conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
+ portals_nid2str(conn->c_peer.peer_ni->pni_number,
+ conn->c_peer.peer_nid, str));
+ ptlrpc_fail_export(lock->l_export);
+}
+
+int ldlm_server_blocking_ast(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc,
+ void *data, int flag)