* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_LDLM
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-#else
-# include <liblustre.h>
-#endif
-
+#include <linux/kthread.h>
+#include <libcfs/libcfs.h>
#include <lustre_dlm.h>
#include <obd_class.h>
#include <libcfs/list.h>
CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
"CPU partitions ldlm threads should run on");
-extern struct kmem_cache *ldlm_resource_slab;
-extern struct kmem_cache *ldlm_lock_slab;
static struct mutex ldlm_ref_mutex;
static int ldlm_refcount;
struct ldlm_bl_pool {
spinlock_t blp_lock;
- /*
- * blp_prio_list is used for callbacks that should be handled
- * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
- * see bug 13843
- */
- cfs_list_t blp_prio_list;
-
- /*
- * blp_list is used for all other callbacks which are likely
- * to take longer to process.
- */
- cfs_list_t blp_list;
-
- cfs_waitq_t blp_waitq;
- struct completion blp_comp;
- cfs_atomic_t blp_num_threads;
- cfs_atomic_t blp_busy_threads;
- int blp_min_threads;
- int blp_max_threads;
+ /*
+ * blp_prio_list is used for callbacks that should be handled
+ * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
+ * see bug 13843
+ */
+ struct list_head blp_prio_list;
+
+ /*
+ * blp_list is used for all other callbacks which are likely
+ * to take longer to process.
+ */
+ struct list_head blp_list;
+
+ wait_queue_head_t blp_waitq;
+ struct completion blp_comp;
+ atomic_t blp_num_threads;
+ atomic_t blp_busy_threads;
+ int blp_min_threads;
+ int blp_max_threads;
};
struct ldlm_bl_work_item {
- cfs_list_t blwi_entry;
- struct ldlm_namespace *blwi_ns;
- struct ldlm_lock_desc blwi_ld;
- struct ldlm_lock *blwi_lock;
- cfs_list_t blwi_head;
- int blwi_count;
- struct completion blwi_comp;
- ldlm_cancel_flags_t blwi_flags;
- int blwi_mem_pressure;
+ struct list_head blwi_entry;
+ struct ldlm_namespace *blwi_ns;
+ struct ldlm_lock_desc blwi_ld;
+ struct ldlm_lock *blwi_lock;
+ struct list_head blwi_head;
+ int blwi_count;
+ struct completion blwi_comp;
+ enum ldlm_cancel_flags blwi_flags;
+ int blwi_mem_pressure;
};
-#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__)
+#ifdef HAVE_SERVER_SUPPORT
/**
* Protects both waiting_locks_list and expired_lock_thread.
*
* All access to it should be under waiting_locks_spinlock.
*/
-static cfs_list_t waiting_locks_list;
-static cfs_timer_t waiting_locks_timer;
+static struct list_head waiting_locks_list;
+static struct timer_list waiting_locks_timer;
static struct expired_lock_thread {
- cfs_waitq_t elt_waitq;
+ wait_queue_head_t elt_waitq;
int elt_state;
int elt_dump;
- cfs_list_t elt_expired_locks;
+ struct list_head elt_expired_locks;
} expired_lock_thread;
static inline int have_expired_locks(void)
ENTRY;
spin_lock_bh(&waiting_locks_spinlock);
- need_to_run = !cfs_list_empty(&expired_lock_thread.elt_expired_locks);
+ need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
spin_unlock_bh(&waiting_locks_spinlock);
RETURN(need_to_run);
*/
static int expired_lock_main(void *arg)
{
- cfs_list_t *expired = &expired_lock_thread.elt_expired_locks;
- struct l_wait_info lwi = { 0 };
- int do_dump;
+ struct list_head *expired = &expired_lock_thread.elt_expired_locks;
+ struct l_wait_info lwi = { 0 };
+ int do_dump;
- ENTRY;
+ ENTRY;
- expired_lock_thread.elt_state = ELT_READY;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ expired_lock_thread.elt_state = ELT_READY;
+ wake_up(&expired_lock_thread.elt_waitq);
- while (1) {
- l_wait_event(expired_lock_thread.elt_waitq,
- have_expired_locks() ||
- expired_lock_thread.elt_state == ELT_TERMINATE,
- &lwi);
+ while (1) {
+ l_wait_event(expired_lock_thread.elt_waitq,
+ have_expired_locks() ||
+ expired_lock_thread.elt_state == ELT_TERMINATE,
+ &lwi);
spin_lock_bh(&waiting_locks_spinlock);
if (expired_lock_thread.elt_dump) {
libcfs_run_lbug_upcall(&msgdata);
spin_lock_bh(&waiting_locks_spinlock);
- expired_lock_thread.elt_dump = 0;
- }
+ expired_lock_thread.elt_dump = 0;
+ }
- do_dump = 0;
+ do_dump = 0;
- while (!cfs_list_empty(expired)) {
- struct obd_export *export;
- struct ldlm_lock *lock;
+ while (!list_empty(expired)) {
+ struct obd_export *export;
+ struct ldlm_lock *lock;
- lock = cfs_list_entry(expired->next, struct ldlm_lock,
- l_pending_chain);
+ lock = list_entry(expired->next, struct ldlm_lock,
+ l_pending_chain);
if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE &&
(void *)lock >= LP_POISON) {
spin_unlock_bh(&waiting_locks_spinlock);
CERROR("free lock on elt list %p\n", lock);
LBUG();
}
- cfs_list_del_init(&lock->l_pending_chain);
+ list_del_init(&lock->l_pending_chain);
if ((void *)lock->l_export <
LP_POISON + PAGE_CACHE_SIZE &&
- (void *)lock->l_export >= LP_POISON) {
- CERROR("lock with free export on elt list %p\n",
- lock->l_export);
- lock->l_export = NULL;
- LDLM_ERROR(lock, "free export");
- /* release extra ref grabbed by
- * ldlm_add_waiting_lock() or
- * ldlm_failed_ast() */
- LDLM_LOCK_RELEASE(lock);
- continue;
- }
+ (void *)lock->l_export >= LP_POISON) {
+ CERROR("lock with free export on elt list %p\n",
+ lock->l_export);
+ lock->l_export = NULL;
+ LDLM_ERROR(lock, "free export");
+ /* release extra ref grabbed by
+ * ldlm_add_waiting_lock() or
+ * ldlm_failed_ast() */
+ LDLM_LOCK_RELEASE(lock);
+ continue;
+ }
- if (lock->l_flags & LDLM_FL_DESTROYED) {
+ if (ldlm_is_destroyed(lock)) {
/* release the lock refcount where
* waiting_locks_callback() founds */
LDLM_LOCK_RELEASE(lock);
export = class_export_lock_get(lock->l_export, lock);
spin_unlock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&export->exp_bl_list_lock);
+ list_del_init(&lock->l_exp_list);
+ spin_unlock_bh(&export->exp_bl_list_lock);
+
do_dump++;
class_fail_export(export);
class_export_lock_put(export, lock);
}
spin_unlock_bh(&waiting_locks_spinlock);
- if (do_dump && obd_dump_on_eviction) {
- CERROR("dump the log upon eviction\n");
- libcfs_debug_dumplog();
- }
+ if (do_dump && obd_dump_on_eviction) {
+ CERROR("dump the log upon eviction\n");
+ libcfs_debug_dumplog();
+ }
- if (expired_lock_thread.elt_state == ELT_TERMINATE)
- break;
- }
+ if (expired_lock_thread.elt_state == ELT_TERMINATE)
+ break;
+ }
- expired_lock_thread.elt_state = ELT_STOPPED;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- RETURN(0);
+ expired_lock_thread.elt_state = ELT_STOPPED;
+ wake_up(&expired_lock_thread.elt_waitq);
+ RETURN(0);
}
static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
return 0;
spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
rq_exp_list) {
if (req->rq_ops->hpreq_lock_match) {
match = req->rq_ops->hpreq_lock_match(req, lock);
int need_dump = 0;
spin_lock_bh(&waiting_locks_spinlock);
- while (!cfs_list_empty(&waiting_locks_list)) {
- lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
+ while (!list_empty(&waiting_locks_list)) {
+ lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
if (cfs_time_after(lock->l_callback_timeout,
cfs_time_current()) ||
(lock->l_req_mode == LCK_GROUP))
break;
- if (ptlrpc_check_suspend()) {
- /* there is a case when we talk to one mds, holding
- * lock from another mds. this way we easily can get
- * here, if second mds is being recovered. so, we
- * suspend timeouts. bug 6019 */
-
- LDLM_ERROR(lock, "recharge timeout: %s@%s nid %s ",
- lock->l_export->exp_client_uuid.uuid,
- lock->l_export->exp_connection->c_remote_uuid.uuid,
- libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
-
- cfs_list_del_init(&lock->l_pending_chain);
- if (lock->l_flags & LDLM_FL_DESTROYED) {
- /* relay the lock refcount decrease to
- * expired lock thread */
- cfs_list_add(&lock->l_pending_chain,
- &expired_lock_thread.elt_expired_locks);
- } else {
- __ldlm_add_waiting_lock(lock,
- ldlm_get_enq_timeout(lock));
- }
- continue;
- }
-
- /* if timeout overlaps the activation time of suspended timeouts
- * then extend it to give a chance for client to reconnect */
- if (cfs_time_before(cfs_time_sub(lock->l_callback_timeout,
- cfs_time_seconds(obd_timeout)/2),
- ptlrpc_suspend_wakeup_time())) {
- LDLM_ERROR(lock, "extend timeout due to recovery: %s@%s nid %s ",
- lock->l_export->exp_client_uuid.uuid,
- lock->l_export->exp_connection->c_remote_uuid.uuid,
- libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
-
- cfs_list_del_init(&lock->l_pending_chain);
- if (lock->l_flags & LDLM_FL_DESTROYED) {
- /* relay the lock refcount decrease to
- * expired lock thread */
- cfs_list_add(&lock->l_pending_chain,
- &expired_lock_thread.elt_expired_locks);
- } else {
- __ldlm_add_waiting_lock(lock,
- ldlm_get_enq_timeout(lock));
- }
- continue;
- }
-
/* Check if we need to prolong timeout */
if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
ldlm_lock_busy(lock)) {
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "prolong the busy lock");
ldlm_refresh_waiting_lock(lock,
- ldlm_get_enq_timeout(lock));
+ ldlm_bl_timeout(lock) >> 1);
spin_lock_bh(&waiting_locks_spinlock);
if (!cont) {
ldlm_lock_to_ns(lock)->ns_timeouts++;
LDLM_ERROR(lock, "lock callback timer expired after %lds: "
"evicting client at %s ",
- cfs_time_current_sec()- lock->l_last_activity,
+ cfs_time_current_sec() - lock->l_last_activity,
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
/* no needs to take an extra ref on the lock since it was in
* the waiting_locks_list and ldlm_add_waiting_lock()
* already grabbed a ref */
- cfs_list_del(&lock->l_pending_chain);
- cfs_list_add(&lock->l_pending_chain,
+ list_del(&lock->l_pending_chain);
+ list_add(&lock->l_pending_chain,
&expired_lock_thread.elt_expired_locks);
need_dump = 1;
}
- if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
+ if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
if (obd_dump_on_timeout && need_dump)
expired_lock_thread.elt_dump = __LINE__;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ wake_up(&expired_lock_thread.elt_waitq);
}
/*
* Make sure the timer will fire again if we have any locks
* left.
*/
- if (!cfs_list_empty(&waiting_locks_list)) {
+ if (!list_empty(&waiting_locks_list)) {
cfs_time_t timeout_rounded;
- lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
+ lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
cfs_time_t timeout;
cfs_time_t timeout_rounded;
- if (!cfs_list_empty(&lock->l_pending_chain))
+ if (!list_empty(&lock->l_pending_chain))
return 0;
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
/* if the new lock has a shorter timeout than something earlier on
the list, we'll wait the longer amount of time; no big deal. */
/* FIFO */
- cfs_list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
+ list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
return 1;
}
+static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
+{
+ spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ if (list_empty(&lock->l_exp_list)) {
+ if (lock->l_granted_mode != lock->l_req_mode)
+ list_add_tail(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ else
+ list_add(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ }
+ spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+ /* A blocked lock is added. Adjust the position in
+ * the stale list if the export is in the list.
+ * If export is stale and not in the list - it is being
+ * processed and will be placed on the right position
+ * on obd_stale_export_put(). */
+ if (!list_empty(&lock->l_export->exp_stale_list))
+ obd_stale_export_adjust(lock->l_export);
+}
+
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
int ret;
- int timeout = ldlm_get_enq_timeout(lock);
+ int timeout = ldlm_bl_timeout(lock);
/* NB: must be called with hold of lock_res_and_lock() */
- LASSERT(lock->l_flags & LDLM_FL_RES_LOCKED);
- lock->l_flags |= LDLM_FL_WAITED;
+ LASSERT(ldlm_is_res_locked(lock));
+ LASSERT(!ldlm_is_cancel_on_block(lock));
- LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+ /* Do not put cross-MDT lock in the waiting list, since we
+ * will not evict it due to timeout for now */
+ if (lock->l_export != NULL &&
+ (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS))
+ return 0;
spin_lock_bh(&waiting_locks_spinlock);
- if (lock->l_flags & LDLM_FL_DESTROYED) {
- static cfs_time_t next;
+ if (ldlm_is_cancel(lock)) {
spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
- if (cfs_time_after(cfs_time_current(), next)) {
- next = cfs_time_shift(14400);
- libcfs_debug_dumpstack(NULL);
- }
- return 0;
- }
+ return 0;
+ }
- ret = __ldlm_add_waiting_lock(lock, timeout);
- if (ret) {
- /* grab ref on the lock if it has been added to the
- * waiting list */
- LDLM_LOCK_GET(lock);
- }
- spin_unlock_bh(&waiting_locks_spinlock);
+ if (ldlm_is_destroyed(lock)) {
+ static cfs_time_t next;
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
+ if (cfs_time_after(cfs_time_current(), next)) {
+ next = cfs_time_shift(14400);
+ libcfs_debug_dumpstack(NULL);
+ }
+ return 0;
+ }
+
+ ldlm_set_waited(lock);
+ lock->l_last_activity = cfs_time_current_sec();
+ ret = __ldlm_add_waiting_lock(lock, timeout);
if (ret) {
- spin_lock_bh(&lock->l_export->exp_bl_list_lock);
- if (cfs_list_empty(&lock->l_exp_list))
- cfs_list_add(&lock->l_exp_list,
- &lock->l_export->exp_bl_list);
- spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+ /* grab ref on the lock if it has been added to the
+ * waiting list */
+ LDLM_LOCK_GET(lock);
}
+ spin_unlock_bh(&waiting_locks_spinlock);
+
+ if (ret)
+ ldlm_add_blocked_lock(lock);
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
*/
static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
{
- cfs_list_t *list_next;
+ struct list_head *list_next;
- if (cfs_list_empty(&lock->l_pending_chain))
+ if (list_empty(&lock->l_pending_chain))
return 0;
list_next = lock->l_pending_chain.next;
cfs_timer_disarm(&waiting_locks_timer);
} else {
struct ldlm_lock *next;
- next = cfs_list_entry(list_next, struct ldlm_lock,
+ next = list_entry(list_next, struct ldlm_lock,
l_pending_chain);
cfs_timer_arm(&waiting_locks_timer,
round_timeout(next->l_callback_timeout));
}
}
- cfs_list_del_init(&lock->l_pending_chain);
+ list_del_init(&lock->l_pending_chain);
return 1;
}
spin_lock_bh(&waiting_locks_spinlock);
ret = __ldlm_del_waiting_lock(lock);
+ ldlm_clear_waited(lock);
spin_unlock_bh(&waiting_locks_spinlock);
/* remove the lock out of export blocking list */
spin_lock_bh(&lock->l_export->exp_bl_list_lock);
- cfs_list_del_init(&lock->l_exp_list);
+ list_del_init(&lock->l_exp_list);
spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
if (ret) {
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
}
-EXPORT_SYMBOL(ldlm_del_waiting_lock);
/**
* Prolong the contended lock waiting time.
return 0;
}
+ if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) {
+ /* We don't have a "waiting locks list" on OSP. */
+ LDLM_DEBUG(lock, "MDS-MDS lock: no-op");
+ return 0;
+ }
+
spin_lock_bh(&waiting_locks_spinlock);
- if (cfs_list_empty(&lock->l_pending_chain)) {
+ if (list_empty(&lock->l_pending_chain)) {
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "wasn't waiting");
return 0;
}
EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
-#else /* !HAVE_SERVER_SUPPORT || !__KERNEL__ */
+#else /* HAVE_SERVER_SUPPORT */
int ldlm_del_waiting_lock(struct ldlm_lock *lock)
{
RETURN(0);
}
-# ifdef HAVE_SERVER_SUPPORT
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+#endif /* !HAVE_SERVER_SUPPORT */
+
+#ifdef HAVE_SERVER_SUPPORT
+
+/**
+ * Calculate the per-export Blocking timeout (covering BL AST, data flush,
+ * lock cancel, and their replies). Used for lock callback timeout and AST
+ * re-send period.
+ *
+ * \param[in] lock lock which is getting the blocking callback
+ *
+ * \retval timeout in seconds to wait for the client reply
+ */
+unsigned int ldlm_bl_timeout(struct ldlm_lock *lock)
{
- LASSERT((lock->l_flags & (LDLM_FL_RES_LOCKED|LDLM_FL_CANCEL_ON_BLOCK))
- == LDLM_FL_RES_LOCKED);
- RETURN(1);
-}
+ unsigned int timeout;
-# endif
-#endif /* HAVE_SERVER_SUPPORT && __KERNEL__ */
+ if (AT_OFF)
+ return obd_timeout / 2;
-#ifdef HAVE_SERVER_SUPPORT
+ /* Since these are non-updating timeouts, we should be conservative.
+ * Take more than usually, 150%
+ * It would be nice to have some kind of "early reply" mechanism for
+ * lock callbacks too... */
+ timeout = at_get(&lock->l_export->exp_bl_lock_at);
+ return max(timeout + (timeout >> 1), ldlm_enqueue_min);
+}
+EXPORT_SYMBOL(ldlm_bl_timeout);
/**
* Perform lock cleanup if AST sending failed.
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
-#ifdef __KERNEL__
spin_lock_bh(&waiting_locks_spinlock);
if (__ldlm_del_waiting_lock(lock) == 0)
/* the lock was not in any list, grab an extra ref before adding
* the lock to the expired list */
LDLM_LOCK_GET(lock);
- cfs_list_add(&lock->l_pending_chain,
+ list_add(&lock->l_pending_chain,
&expired_lock_thread.elt_expired_locks);
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ wake_up(&expired_lock_thread.elt_waitq);
spin_unlock_bh(&waiting_locks_spinlock);
-#else
- class_fail_export(lock->l_export);
-#endif
}
/**
struct ptlrpc_request *req, int rc,
const char *ast_type)
{
- lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
-
- if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
- LASSERT(lock->l_export);
- if (lock->l_export->exp_libclient) {
- LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
- " timeout, just cancelling lock", ast_type,
- libcfs_nid2str(peer.nid));
- ldlm_lock_cancel(lock);
- rc = -ERESTART;
- } else if (lock->l_flags & LDLM_FL_CANCEL) {
- LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
- "cancel was received (AST reply lost?)",
- ast_type, libcfs_nid2str(peer.nid));
- ldlm_lock_cancel(lock);
- rc = -ERESTART;
- } else {
- ldlm_del_waiting_lock(lock);
- ldlm_failed_ast(lock, rc, ast_type);
- }
- } else if (rc) {
- if (rc == -EINVAL) {
- struct ldlm_resource *res = lock->l_resource;
- LDLM_DEBUG(lock, "client (nid %s) returned %d"
- " from %s AST - normal race",
- libcfs_nid2str(peer.nid),
- req->rq_repmsg ?
- lustre_msg_get_status(req->rq_repmsg) : -1,
- ast_type);
- if (res) {
- /* update lvbo to return proper attributes.
- * see bug 23174 */
- ldlm_resource_getref(res);
- ldlm_res_lvbo_update(res, NULL, 1);
- ldlm_resource_putref(res);
- }
+ lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
+
+ if (!req->rq_replied || (rc && rc != -EINVAL)) {
+ if (lock->l_export && lock->l_export->exp_libclient) {
+ LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
+ " timeout, just cancelling lock", ast_type,
+ libcfs_nid2str(peer.nid));
+ ldlm_lock_cancel(lock);
+ rc = -ERESTART;
+ } else if (ldlm_is_cancel(lock)) {
+ LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
+ "cancel was received (AST reply lost?)",
+ ast_type, libcfs_nid2str(peer.nid));
+ ldlm_lock_cancel(lock);
+ rc = -ERESTART;
+ } else {
+ LDLM_ERROR(lock, "client (nid %s) %s %s AST "
+ "(req status %d rc %d), evict it",
+ libcfs_nid2str(peer.nid),
+ req->rq_replied ? "returned error from" :
+ "failed to reply to",
+ ast_type,
+ (req->rq_repmsg != NULL) ?
+ lustre_msg_get_status(req->rq_repmsg) : 0,
+ rc);
+ ldlm_failed_ast(lock, rc, ast_type);
+ }
+ return rc;
+ }
- } else {
- LDLM_ERROR(lock, "client (nid %s) returned %d "
- "from %s AST", libcfs_nid2str(peer.nid),
- (req->rq_repmsg != NULL) ?
- lustre_msg_get_status(req->rq_repmsg) : 0,
- ast_type);
- }
- ldlm_lock_cancel(lock);
- /* Server-side AST functions are called from ldlm_reprocess_all,
- * which needs to be told to please restart its reprocessing. */
- rc = -ERESTART;
- }
+ if (rc == -EINVAL) {
+ struct ldlm_resource *res = lock->l_resource;
+
+ LDLM_DEBUG(lock, "client (nid %s) returned %d"
+ " from %s AST - normal race",
+ libcfs_nid2str(peer.nid),
+ req->rq_repmsg ?
+ lustre_msg_get_status(req->rq_repmsg) : -1,
+ ast_type);
+ if (res) {
+ /* update lvbo to return proper attributes.
+ * see bug 23174 */
+ ldlm_resource_getref(res);
+ ldlm_res_lvbo_update(res, NULL, 1);
+ ldlm_resource_putref(res);
+ }
+ ldlm_lock_cancel(lock);
+ rc = -ERESTART;
+ }
- return rc;
+ return rc;
}
static int ldlm_cb_interpret(const struct lu_env *env,
LDLM_LOCK_RELEASE(lock);
if (rc == -ERESTART)
- cfs_atomic_inc(&arg->restart);
+ atomic_inc(&arg->restart);
- RETURN(0);
+ RETURN(0);
+}
+
+static void ldlm_update_resend(struct ptlrpc_request *req, void *data)
+{
+ struct ldlm_cb_async_args *ca = data;
+ struct ldlm_lock *lock = ca->ca_lock;
+
+ ldlm_refresh_waiting_lock(lock, ldlm_bl_timeout(lock));
}
static inline int ldlm_ast_fini(struct ptlrpc_request *req,
rc = ptl_send_rpc(req, 1);
ptlrpc_req_finished(req);
if (rc == 0)
- cfs_atomic_inc(&arg->restart);
+ atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
ptlrpc_set_add_req(arg->set, req);
}
spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
- rq_exp_list) {
+ list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ rq_exp_list) {
/* Do not process requests that were not yet added to there
* incoming queue or were already removed from there for
* processing. We evaluate ptlrpc_nrs_req_can_move() without
ca->ca_lock = lock;
req->rq_interpret_reply = ldlm_cb_interpret;
- req->rq_no_resend = 1;
lock_res_and_lock(lock);
- if (lock->l_granted_mode != lock->l_req_mode) {
- /* this blocking AST will be communicated as part of the
- * completion AST instead */
+ if (ldlm_is_destroyed(lock)) {
+ /* What's the point? */
unlock_res_and_lock(lock);
-
ptlrpc_req_finished(req);
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
- if (lock->l_flags & LDLM_FL_DESTROYED) {
- /* What's the point? */
+ if (lock->l_granted_mode != lock->l_req_mode) {
+ /* this blocking AST will be communicated as part of the
+ * completion AST instead */
+ ldlm_add_blocked_lock(lock);
+ ldlm_set_waited(lock);
unlock_res_and_lock(lock);
+
ptlrpc_req_finished(req);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
- if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
+ if (ldlm_is_cancel_on_block(lock))
instant_cancel = 1;
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
body->lock_handle[0] = lock->l_remote_handle;
body->lock_desc = *desc;
- body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_AST_FLAGS);
+ body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK);
LDLM_DEBUG(lock, "server preparing blocking AST");
if (instant_cancel) {
unlock_res_and_lock(lock);
ldlm_lock_cancel(lock);
+
+ req->rq_no_resend = 1;
} else {
LASSERT(lock->l_granted_mode == lock->l_req_mode);
ldlm_add_waiting_lock(lock);
unlock_res_and_lock(lock);
+
+ /* Do not resend after lock callback timeout */
+ req->rq_delay_limit = ldlm_bl_timeout(lock);
+ req->rq_resend_cb = ldlm_update_resend;
}
req->rq_send_state = LUSTRE_IMP_FULL;
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
+ lock->l_last_activity = cfs_time_current_sec();
+
if (lock->l_export && lock->l_export->exp_nid_stats &&
lock->l_export->exp_nid_stats->nid_ldlm_stats)
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_server_blocking_ast);
/**
* ->l_completion_ast callback for a remote lock in server namespace.
struct ldlm_request *body;
struct ptlrpc_request *req;
struct ldlm_cb_async_args *ca;
- long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
int lvb_len;
LASSERT(lock != NULL);
LASSERT(data != NULL);
- total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
- lock->l_last_activity);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
+ LDLM_DEBUG(lock, "dropping CP AST");
+ RETURN(0);
+ }
req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
&RQF_LDLM_CP_CALLBACK);
ca->ca_lock = lock;
req->rq_interpret_reply = ldlm_cb_interpret;
- req->rq_no_resend = 1;
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
body->lock_handle[0] = lock->l_remote_handle;
}
}
- LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
- total_enqueue_wait);
+ lock->l_last_activity = cfs_time_current_sec();
- /* Server-side enqueue wait time estimate, used in
- __ldlm_add_waiting_lock to set future enqueue timers */
- if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
- at_measured(ldlm_lock_to_ns_at(lock),
- total_enqueue_wait);
- else
- /* bz18618. Don't add lock enqueue time we spend waiting for a
- previous callback to fail. Locks waiting legitimately will
- get extended by ldlm_refresh_waiting_lock regardless of the
- estimate, so it's okay to underestimate here. */
- LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
- "It is likely that a previous callback timed out.",
- total_enqueue_wait,
- at_get(ldlm_lock_to_ns_at(lock)));
+ LDLM_DEBUG(lock, "server preparing completion AST");
ptlrpc_request_set_replen(req);
/* We only send real blocking ASTs after the lock is granted */
lock_res_and_lock(lock);
- if (lock->l_flags & LDLM_FL_AST_SENT) {
+ if (ldlm_is_ast_sent(lock)) {
body->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
/* Copy AST flags like LDLM_FL_DISCARD_DATA. */
body->lock_flags |= ldlm_flags_to_wire(lock->l_flags &
- LDLM_AST_FLAGS);
+ LDLM_FL_AST_MASK);
/* We might get here prior to ldlm_handle_enqueue setting
* LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
* ldlm_handle_enqueue will call ldlm_lock_cancel() still,
* that would not only cancel the lock, but will also remove
* it from waiting list */
- if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
- unlock_res_and_lock(lock);
- ldlm_lock_cancel(lock);
- instant_cancel = 1;
- lock_res_and_lock(lock);
- } else {
- /* start the lock-timeout clock */
- ldlm_add_waiting_lock(lock);
- }
+ if (ldlm_is_cancel_on_block(lock)) {
+ unlock_res_and_lock(lock);
+ ldlm_lock_cancel(lock);
+
+ instant_cancel = 1;
+ req->rq_no_resend = 1;
+
+ lock_res_and_lock(lock);
+ } else {
+ /* start the lock-timeout clock */
+ ldlm_add_waiting_lock(lock);
+ /* Do not resend after lock callback timeout */
+ req->rq_delay_limit = ldlm_bl_timeout(lock);
+ req->rq_resend_cb = ldlm_update_resend;
+ }
}
unlock_res_and_lock(lock);
RETURN(lvb_len < 0 ? lvb_len : rc);
}
-EXPORT_SYMBOL(ldlm_server_completion_ast);
/**
* Server side ->l_glimpse_ast handler for client locks.
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
+ lock->l_last_activity = cfs_time_current_sec();
+
req->rq_interpret_reply = ldlm_cb_interpret;
if (lock->l_export && lock->l_export->exp_nid_stats &&
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_server_glimpse_ast);
-int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
+int ldlm_glimpse_locks(struct ldlm_resource *res,
+ struct list_head *gl_work_list)
{
int rc;
ENTRY;
* service threads to carry out client lock enqueueing requests.
*/
int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
- struct ptlrpc_request *req,
- const struct ldlm_request *dlm_req,
- const struct ldlm_callback_suite *cbs)
+ struct ptlrpc_request *req,
+ const struct ldlm_request *dlm_req,
+ const struct ldlm_callback_suite *cbs)
{
- struct ldlm_reply *dlm_rep;
+ struct ldlm_reply *dlm_rep;
__u64 flags;
- ldlm_error_t err = ELDLM_OK;
- struct ldlm_lock *lock = NULL;
- void *cookie = NULL;
- int rc = 0;
- ENTRY;
+ enum ldlm_error err = ELDLM_OK;
+ struct ldlm_lock *lock = NULL;
+ void *cookie = NULL;
+ int rc = 0;
+ struct ldlm_resource *res = NULL;
+ ENTRY;
- LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
+ LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
- ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
+ ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
flags = ldlm_flags_from_wire(dlm_req->lock_flags);
- LASSERT(req->rq_export);
+ LASSERT(req->rq_export);
if (ptlrpc_req2svc(req)->srv_stats != NULL)
ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
}
#endif
- if (unlikely(flags & LDLM_FL_REPLAY)) {
+ if (unlikely((flags & LDLM_FL_REPLAY) ||
+ (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
/* Find an existing lock in the per-export lock hash */
/* In the function below, .hs_keycmp resolves to
* ldlm_export_lock_keycmp() */
if (lock != NULL) {
DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
LPX64, lock->l_handle.h_cookie);
+ flags |= LDLM_FL_RESENT;
GOTO(existing_lock, rc = 0);
- }
- }
+ }
+ } else {
+ if (ldlm_reclaim_full()) {
+ DEBUG_REQ(D_DLMTRACE, req, "Too many granted locks, "
+ "reject current enqueue request and let the "
+ "client retry later.\n");
+ GOTO(out, rc = -EINPROGRESS);
+ }
+ }
- /* The lock's callback data might be set in the policy function */
- lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
- dlm_req->lock_desc.l_resource.lr_type,
- dlm_req->lock_desc.l_req_mode,
+ /* The lock's callback data might be set in the policy function */
+ lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
+ dlm_req->lock_desc.l_resource.lr_type,
+ dlm_req->lock_desc.l_req_mode,
cbs, NULL, 0, LVB_T_NONE);
- if (!lock)
- GOTO(out, rc = -ENOMEM);
+ if (IS_ERR(lock)) {
+ rc = PTR_ERR(lock);
+ lock = NULL;
+ GOTO(out, rc);
+ }
- lock->l_last_activity = cfs_time_current_sec();
lock->l_remote_handle = dlm_req->lock_handle[0];
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
+ /* Initialize resource lvb but not for a lock being replayed since
+ * Client already got lvb sent in this case.
+ * This must occur early since some policy methods assume resource
+ * lvb is available (lr_lvb_data != NULL).
+ */
+ res = lock->l_resource;
+ if (!(flags & LDLM_FL_REPLAY)) {
+ /* non-replayed lock, delayed lvb init may need to be done */
+ rc = ldlm_lvbo_init(res);
+ if (rc < 0) {
+ LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc);
+ GOTO(out, rc);
+ }
+ }
+
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
/* Don't enqueue a lock onto the export if it is been disonnected
* due to eviction (bug 3822) or server umount (bug 24324).
&lock->l_remote_handle,
&lock->l_exp_hash);
+ /* Inherit the enqueue flags before the operation, because we do not
+ * keep the res lock on return and next operations (BL AST) may proceed
+ * without them. */
+ lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
+ LDLM_FL_INHERIT_MASK);
existing_lock:
if (flags & LDLM_FL_HAS_INTENT) {
}
dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- dlm_rep->lock_flags = ldlm_flags_to_wire(flags);
ldlm_lock2desc(lock, &dlm_rep->lock_desc);
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
+ if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
+
/* We never send a blocking AST until the lock is granted, but
* we can tell it right now */
lock_res_and_lock(lock);
/* Now take into account flags to be inherited from original lock
request both in reply to client and in our own lock flags. */
- dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
- lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
- LDLM_INHERIT_FLAGS);
+ dlm_rep->lock_flags = ldlm_flags_to_wire(flags);
+ lock->l_flags |= flags & LDLM_FL_INHERIT_MASK;
/* Don't move a pending lock onto the export if it has already been
* disconnected due to eviction (bug 5683) or server umount (bug 24324).
OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
rc = -ENOTCONN;
- } else if (lock->l_flags & LDLM_FL_AST_SENT) {
+ } else if (ldlm_is_ast_sent(lock)) {
dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
if (lock->l_granted_mode == lock->l_req_mode) {
/*
if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
req->rq_export->exp_libclient) {
- if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
+ if (unlikely(!ldlm_is_cancel_on_block(lock) ||
!(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
CERROR("Granting sync lock to libclient. "
"req fl %d, rep fl %d, lock fl "LPX64"\n",
/* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
* ldlm_reprocess_all. If this moves, revisit that code. -phil */
- if (lock) {
- LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
- "(err=%d, rc=%d)", err, rc);
+ if (lock != NULL) {
+ LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
+ "(err=%d, rc=%d)", err, rc);
- if (rc == 0) {
+ if (rc == 0) {
if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER) &&
ldlm_lvbo_size(lock) > 0) {
req, lock);
buflen = req_capsule_get_size(&req->rq_pill,
&RMF_DLM_LVB, RCL_SERVER);
- buflen = ldlm_lvbo_fill(lock, buf, buflen);
- if (buflen >= 0)
- req_capsule_shrink(&req->rq_pill,
- &RMF_DLM_LVB,
- buflen, RCL_SERVER);
- else
+ /* non-replayed lock, delayed lvb init may
+ * need to be occur now */
+ if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
+ buflen = ldlm_lvbo_fill(lock, buf,
+ buflen);
+ if (buflen >= 0)
+ req_capsule_shrink(
+ &req->rq_pill,
+ &RMF_DLM_LVB,
+ buflen, RCL_SERVER);
+ else
+ rc = buflen;
+ } else if (flags & LDLM_FL_REPLAY) {
+ /* no LVB resend upon replay */
+ if (buflen > 0)
+ req_capsule_shrink(
+ &req->rq_pill,
+ &RMF_DLM_LVB,
+ 0, RCL_SERVER);
+ else
+ rc = buflen;
+ } else {
rc = buflen;
+ }
}
- } else {
- lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- ldlm_lock_destroy_nolock(lock);
- unlock_res_and_lock(lock);
- }
+ }
+
+ if (rc != 0) {
+ if (lock->l_export) {
+ ldlm_lock_cancel(lock);
+ } else {
+ lock_res_and_lock(lock);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_destroy_nolock(lock);
+ unlock_res_and_lock(lock);
+
+ }
+ }
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
return rc;
}
-EXPORT_SYMBOL(ldlm_handle_enqueue0);
/**
* Old-style LDLM main entry point for server code enqueue.
}
return rc;
}
-EXPORT_SYMBOL(ldlm_handle_enqueue);
/**
* Main LDLM entry point for server code to process lock conversion requests.
LDLM_DEBUG(lock, "server-side convert handler START");
- lock->l_last_activity = cfs_time_current_sec();
res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
if (res) {
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_handle_convert0);
/**
* Old-style main LDLM entry point for server code to process lock conversion
}
return rc;
}
-EXPORT_SYMBOL(ldlm_handle_convert);
/**
* Cancel all the locks whose handles are packed into ldlm_request
* requests.
*/
int ldlm_request_cancel(struct ptlrpc_request *req,
- const struct ldlm_request *dlm_req, int first)
+ const struct ldlm_request *dlm_req,
+ int first, enum lustre_at_flags flags)
{
struct ldlm_resource *res, *pres = NULL;
struct ldlm_lock *lock;
}
pres = res;
}
+
+ if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock)) {
+ long delay = cfs_time_sub(cfs_time_current_sec(),
+ lock->l_last_activity);
+ LDLM_DEBUG(lock, "server cancels blocked lock after "
+ CFS_DURATION_T"s", delay);
+ at_measured(&lock->l_export->exp_bl_lock_at, delay);
+ }
ldlm_lock_cancel(lock);
LDLM_LOCK_PUT(lock);
}
if (rc)
RETURN(rc);
- if (!ldlm_request_cancel(req, dlm_req, 0))
+ if (!ldlm_request_cancel(req, dlm_req, 0, LATF_STATS))
req->rq_status = LUSTRE_ESTALE;
RETURN(ptlrpc_reply(req));
}
-EXPORT_SYMBOL(ldlm_handle_cancel);
#endif /* HAVE_SERVER_SUPPORT */
/**
LDLM_DEBUG(lock, "client blocking AST callback handler");
lock_res_and_lock(lock);
- lock->l_flags |= LDLM_FL_CBPENDING;
+ ldlm_set_cbpending(lock);
- if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
- lock->l_flags |= LDLM_FL_CANCEL;
+ if (ldlm_is_cancel_on_block(lock))
+ ldlm_set_cancel(lock);
do_ast = (!lock->l_readers && !lock->l_writers);
unlock_res_and_lock(lock);
struct ldlm_request *dlm_req,
struct ldlm_lock *lock)
{
+ struct list_head ast_list;
int lvb_len;
- CFS_LIST_HEAD(ast_list);
int rc = 0;
ENTRY;
LDLM_DEBUG(lock, "client completion callback handler START");
+ INIT_LIST_HEAD(&ast_list);
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
int to = cfs_time_seconds(1);
while (to > 0) {
- cfs_schedule_timeout_and_set_state(
- CFS_TASK_INTERRUPTIBLE, to);
+ set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(to);
if (lock->l_granted_mode == lock->l_req_mode ||
- lock->l_flags & LDLM_FL_DESTROYED)
+ ldlm_is_destroyed(lock))
break;
}
}
lock->l_lvb_len, lvb_len);
GOTO(out, rc = -EINVAL);
}
- } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
- * variable length */
- void *lvb_data;
-
- OBD_ALLOC(lvb_data, lvb_len);
- if (lvb_data == NULL) {
- LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
- GOTO(out, rc = -ENOMEM);
- }
-
- lock_res_and_lock(lock);
- LASSERT(lock->l_lvb_data == NULL);
- lock->l_lvb_data = lvb_data;
- lock->l_lvb_len = lvb_len;
- unlock_res_and_lock(lock);
}
}
lock_res_and_lock(lock);
- if ((lock->l_flags & LDLM_FL_DESTROYED) ||
+ if (ldlm_is_destroyed(lock) ||
lock->l_granted_mode == lock->l_req_mode) {
/* bug 11300: the lock has already been granted */
unlock_res_and_lock(lock);
/* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast. */
ldlm_lock_remove_from_lru(lock);
- lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
+ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
LDLM_DEBUG(lock, "completion AST includes blocking AST");
}
out:
if (rc < 0) {
lock_res_and_lock(lock);
- lock->l_flags |= LDLM_FL_FAILED;
+ ldlm_set_failed(lock);
unlock_res_and_lock(lock);
- cfs_waitq_signal(&lock->l_waitq);
+ wake_up(&lock->l_waitq);
}
LDLM_LOCK_RELEASE(lock);
}
return ptlrpc_reply(req);
}
-#ifdef __KERNEL__
static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
- ldlm_cancel_flags_t cancel_flags)
+ enum ldlm_cancel_flags cancel_flags)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
ENTRY;
spin_lock(&blp->blp_lock);
if (blwi->blwi_lock &&
- blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
+ ldlm_is_discard_data(blwi->blwi_lock)) {
/* add LDLM_FL_DISCARD_DATA requests to the priority list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+ list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
} else {
/* other blocking callbacks are added to the regular list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+ list_add_tail(&blwi->blwi_entry, &blp->blp_list);
}
spin_unlock(&blp->blp_lock);
- cfs_waitq_signal(&blp->blp_waitq);
+ wake_up(&blp->blp_waitq);
/* can not check blwi->blwi_flags as blwi could be already freed in
LCF_ASYNC mode */
static inline void init_blwi(struct ldlm_bl_work_item *blwi,
struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count,
+ struct list_head *cancels, int count,
struct ldlm_lock *lock,
- ldlm_cancel_flags_t cancel_flags)
+ enum ldlm_cancel_flags cancel_flags)
{
init_completion(&blwi->blwi_comp);
- CFS_INIT_LIST_HEAD(&blwi->blwi_head);
+ INIT_LIST_HEAD(&blwi->blwi_head);
if (memory_pressure_get())
blwi->blwi_mem_pressure = 1;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
- cfs_list_add(&blwi->blwi_head, cancels);
- cfs_list_del_init(cancels);
+ list_add(&blwi->blwi_head, cancels);
+ list_del_init(cancels);
blwi->blwi_count = count;
} else {
blwi->blwi_lock = lock;
static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld,
struct ldlm_lock *lock,
- cfs_list_t *cancels, int count,
- ldlm_cancel_flags_t cancel_flags)
+ struct list_head *cancels, int count,
+ enum ldlm_cancel_flags cancel_flags)
{
ENTRY;
}
}
-#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock *lock)
{
-#ifdef __KERNEL__
return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
-#else
- return -ENOSYS;
-#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count,
- ldlm_cancel_flags_t cancel_flags)
+ struct list_head *cancels, int count,
+ enum ldlm_cancel_flags cancel_flags)
{
-#ifdef __KERNEL__
return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
-#else
- return -ENOSYS;
-#endif
+}
+
+int ldlm_bl_thread_wakeup(void)
+{
+ wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
+ return 0;
}
/* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
CWARN("Send reply failed, maybe cause bug 21636.\n");
}
-static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
-{
- struct obd_quotactl *oqctl;
- struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
-
- oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL) {
- CERROR("Can't unpack obd_quotactl\n");
- RETURN(-EPROTO);
- }
-
- oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
-
- cli->cl_qchk_stat = oqctl->qc_stat;
- return 0;
-}
-
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
switch (lustre_msg_get_opc(req->rq_reqmsg)) {
case LDLM_BL_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
+ if (cfs_fail_err)
+ ldlm_callback_reply(req, -(int)cfs_fail_err);
RETURN(0);
+ }
break;
case LDLM_CP_CALLBACK:
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
rc = llog_origin_handle_close(req);
ldlm_callback_reply(req, rc);
RETURN(0);
- case OBD_QC_CALLBACK:
- req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
- if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
- RETURN(0);
- rc = ldlm_handle_qc_callback(req);
- ldlm_callback_reply(req, rc);
- RETURN(0);
default:
CERROR("unknown opcode %u\n",
lustre_msg_get_opc(req->rq_reqmsg));
RETURN(0);
}
- if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
+ if (ldlm_is_fail_loc(lock) &&
lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
lock_res_and_lock(lock);
lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
- LDLM_AST_FLAGS);
+ LDLM_FL_AST_MASK);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
/* If somebody cancels lock and cache is already dropped,
* or lock is failed before cp_ast received on client,
* we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache. */
- if (((lock->l_flags & LDLM_FL_CANCELING) &&
- (lock->l_flags & LDLM_FL_BL_DONE)) ||
- (lock->l_flags & LDLM_FL_FAILED)) {
+ if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
+ ldlm_is_failed(lock)) {
LDLM_DEBUG(lock, "callback on lock "
LPX64" - lock disappeared\n",
dlm_req->lock_handle[0].cookie);
/* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast. */
ldlm_lock_remove_from_lru(lock);
- lock->l_flags |= LDLM_FL_BL_AST;
+ ldlm_set_bl_ast(lock);
}
unlock_res_and_lock(lock);
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
- if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
+ if (!ldlm_is_cancel_on_block(lock)) {
rc = ldlm_callback_reply(req, 0);
if (req->rq_no_reply || rc)
ldlm_callback_errmsg(req, "Normal process", rc,
req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
CDEBUG(D_INODE, "cancel\n");
if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
- CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))
+ CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
+ CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
RETURN(0);
rc = ldlm_handle_cancel(req);
if (rc)
if (lock == NULL)
continue;
- rc = !!(lock->l_flags & LDLM_FL_AST_SENT);
+ rc = ldlm_is_ast_sent(lock) ? 1 : 0;
if (rc)
LDLM_DEBUG(lock, "hpreq cancel lock");
LDLM_LOCK_PUT(lock);
RETURN(0);
}
-int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
- cfs_hlist_node_t *hnode, void *data)
+static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *data)
{
- cfs_list_t *rpc_list = data;
+ struct list_head *rpc_list = data;
struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
lock_res_and_lock(lock);
return 0;
}
- if (lock->l_flags & LDLM_FL_AST_SENT) {
+ if (ldlm_is_ast_sent(lock)) {
unlock_res_and_lock(lock);
return 0;
}
LASSERT(lock->l_blocking_ast);
LASSERT(!lock->l_blocking_lock);
- lock->l_flags |= LDLM_FL_AST_SENT;
+ ldlm_set_ast_sent(lock);
if (lock->l_export && lock->l_export->exp_lock_hash) {
/* NB: it's safe to call cfs_hash_del() even lock isn't
* in exp_lock_hash. */
&lock->l_remote_handle, &lock->l_exp_hash);
}
- cfs_list_add_tail(&lock->l_rk_ast, rpc_list);
+ list_add_tail(&lock->l_rk_ast, rpc_list);
LDLM_LOCK_GET(lock);
unlock_res_and_lock(lock);
void ldlm_revoke_export_locks(struct obd_export *exp)
{
- cfs_list_t rpc_list;
+ struct list_head rpc_list;
ENTRY;
- CFS_INIT_LIST_HEAD(&rpc_list);
+ INIT_LIST_HEAD(&rpc_list);
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_revoke_lock_cb, &rpc_list);
ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
EXPORT_SYMBOL(ldlm_revoke_export_locks);
#endif /* HAVE_SERVER_SUPPORT */
-#ifdef __KERNEL__
-static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
+static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item **p_blwi,
+ struct obd_export **p_exp)
{
struct ldlm_bl_work_item *blwi = NULL;
static unsigned int num_bl = 0;
+ static unsigned int num_stale;
+ int num_th = atomic_read(&blp->blp_num_threads);
+
+ *p_exp = obd_stale_export_get();
spin_lock(&blp->blp_lock);
- /* process a request from the blp_list at least every blp_num_threads */
- if (!cfs_list_empty(&blp->blp_list) &&
- (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0))
- blwi = cfs_list_entry(blp->blp_list.next,
- struct ldlm_bl_work_item, blwi_entry);
- else
- if (!cfs_list_empty(&blp->blp_prio_list))
- blwi = cfs_list_entry(blp->blp_prio_list.next,
- struct ldlm_bl_work_item,
- blwi_entry);
-
- if (blwi) {
- if (++num_bl >= cfs_atomic_read(&blp->blp_num_threads))
- num_bl = 0;
- cfs_list_del(&blwi->blwi_entry);
- }
+ if (*p_exp != NULL) {
+ if (num_th == 1 || ++num_stale < num_th) {
+ spin_unlock(&blp->blp_lock);
+ return 1;
+ } else {
+ num_stale = 0;
+ }
+ }
+
+ /* process a request from the blp_list at least every blp_num_threads */
+ if (!list_empty(&blp->blp_list) &&
+ (list_empty(&blp->blp_prio_list) || num_bl == 0))
+ blwi = list_entry(blp->blp_list.next,
+ struct ldlm_bl_work_item, blwi_entry);
+ else
+ if (!list_empty(&blp->blp_prio_list))
+ blwi = list_entry(blp->blp_prio_list.next,
+ struct ldlm_bl_work_item,
+ blwi_entry);
+
+ if (blwi) {
+ if (++num_bl >= num_th)
+ num_bl = 0;
+ list_del(&blwi->blwi_entry);
+ }
spin_unlock(&blp->blp_lock);
+ *p_blwi = blwi;
- return blwi;
+ if (*p_exp != NULL && *p_blwi != NULL) {
+ obd_stale_export_put(*p_exp);
+ *p_exp = NULL;
+ }
+
+ return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0;
}
/* This only contains temporary data until the thread starts */
static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
{
struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
- cfs_task_t *task;
+ struct task_struct *task;
init_completion(&bltd.bltd_comp);
- bltd.bltd_num = cfs_atomic_read(&blp->blp_num_threads);
+ bltd.bltd_num = atomic_read(&blp->blp_num_threads);
snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1,
"ldlm_bl_%02d", bltd.bltd_num);
task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name);
if (IS_ERR(task)) {
CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
- cfs_atomic_read(&blp->blp_num_threads), PTR_ERR(task));
+ atomic_read(&blp->blp_num_threads), PTR_ERR(task));
return PTR_ERR(task);
}
wait_for_completion(&bltd.bltd_comp);
return 0;
}
+/* Not fatal if racy and have a few too many threads */
+static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ int busy = atomic_read(&blp->blp_busy_threads);
+
+ if (busy >= blp->blp_max_threads)
+ return 0;
+
+ if (busy < atomic_read(&blp->blp_num_threads))
+ return 0;
+
+ if (blwi != NULL && (blwi->blwi_ns == NULL ||
+ blwi->blwi_mem_pressure))
+ return 0;
+
+ return 1;
+}
+
+static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ ENTRY;
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ RETURN(LDLM_ITER_STOP);
+
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_set();
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+
+ if (blwi->blwi_count) {
+ int count;
+ /* The special case when we cancel locks in lru
+ * asynchronously, we pass the list of locks here.
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count,
+ LCF_BL_AST);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+ blwi->blwi_flags);
+ } else {
+ ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
+ blwi->blwi_lock);
+ }
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_clr();
+
+ if (blwi->blwi_flags & LCF_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ complete(&blwi->blwi_comp);
+
+ RETURN(0);
+}
+
+/**
+ * Cancel stale locks on export. Cancel blocked locks first.
+ * If the given export has blocked locks, the next in the list may have
+ * them too, thus cancel not blocked locks only if the current export has
+ * no blocked locks.
+ **/
+static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
+ struct obd_export *exp)
+{
+ int num;
+ ENTRY;
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
+
+ num = ldlm_export_cancel_blocked_locks(exp);
+ if (num == 0)
+ ldlm_export_cancel_locks(exp);
+
+ obd_stale_export_put(exp);
+
+ RETURN(0);
+}
+
+
/**
* Main blocking requests processing thread.
*
static int ldlm_bl_thread_main(void *arg)
{
struct ldlm_bl_pool *blp;
+ struct ldlm_bl_thread_data *bltd = arg;
ENTRY;
- {
- struct ldlm_bl_thread_data *bltd = arg;
+ blp = bltd->bltd_blp;
- blp = bltd->bltd_blp;
+ atomic_inc(&blp->blp_num_threads);
+ atomic_inc(&blp->blp_busy_threads);
- cfs_atomic_inc(&blp->blp_num_threads);
- cfs_atomic_inc(&blp->blp_busy_threads);
+ complete(&bltd->bltd_comp);
+ /* cannot use bltd after this, it is only on caller's stack */
- complete(&bltd->bltd_comp);
- /* cannot use bltd after this, it is only on caller's stack */
- }
+ while (1) {
+ struct l_wait_info lwi = { 0 };
+ struct ldlm_bl_work_item *blwi = NULL;
+ struct obd_export *exp = NULL;
+ int rc;
- while (1) {
- struct l_wait_info lwi = { 0 };
- struct ldlm_bl_work_item *blwi = NULL;
- int busy;
+ rc = ldlm_bl_get_work(blp, &blwi, &exp);
- blwi = ldlm_bl_get_work(blp);
+ if (rc == 0) {
+ atomic_dec(&blp->blp_busy_threads);
+ l_wait_event_exclusive(blp->blp_waitq,
+ ldlm_bl_get_work(blp, &blwi,
+ &exp),
+ &lwi);
+ atomic_inc(&blp->blp_busy_threads);
+ }
- if (blwi == NULL) {
- cfs_atomic_dec(&blp->blp_busy_threads);
- l_wait_event_exclusive(blp->blp_waitq,
- (blwi = ldlm_bl_get_work(blp)) != NULL,
- &lwi);
- busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
- } else {
- busy = cfs_atomic_read(&blp->blp_busy_threads);
- }
+ if (ldlm_bl_thread_need_create(blp, blwi))
+ /* discard the return value, we tried */
+ ldlm_bl_thread_start(blp);
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
+ if (exp)
+ rc = ldlm_bl_thread_exports(blp, exp);
+ else if (blwi)
+ rc = ldlm_bl_thread_blwi(blp, blwi);
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= cfs_atomic_read(&blp->blp_num_threads) &&
- !blwi->blwi_mem_pressure))
- /* discard the return value, we tried */
- ldlm_bl_thread_start(blp);
-
- if (blwi->blwi_mem_pressure)
- memory_pressure_set();
-
- if (blwi->blwi_count) {
- int count;
- /* The special case when we cancel locks in LRU
- * asynchronously, we pass the list of locks here.
- * Thus locks are marked LDLM_FL_CANCELING, but NOT
- * canceled locally yet. */
- count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
- blwi->blwi_count,
- LCF_BL_AST);
- ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
- blwi->blwi_flags);
- } else {
- ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
- blwi->blwi_lock);
- }
- if (blwi->blwi_mem_pressure)
- memory_pressure_clr();
-
- if (blwi->blwi_flags & LCF_ASYNC)
- OBD_FREE(blwi, sizeof(*blwi));
- else
- complete(&blwi->blwi_comp);
- }
+ if (rc == LDLM_ITER_STOP)
+ break;
+ }
- cfs_atomic_dec(&blp->blp_busy_threads);
- cfs_atomic_dec(&blp->blp_num_threads);
+ atomic_dec(&blp->blp_busy_threads);
+ atomic_dec(&blp->blp_num_threads);
complete(&blp->blp_comp);
- RETURN(0);
+ RETURN(0);
}
-#endif
static int ldlm_setup(void);
static int ldlm_cleanup(void);
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_get_ref);
void ldlm_put_ref(void)
{
EXIT;
}
-EXPORT_SYMBOL(ldlm_put_ref);
/*
* Export handle<->lock hash operations.
*/
static unsigned
-ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
+ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
{
return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
}
static void *
-ldlm_export_lock_key(cfs_hlist_node_t *hnode)
+ldlm_export_lock_key(struct hlist_node *hnode)
{
struct ldlm_lock *lock;
- lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
return &lock->l_remote_handle;
}
static void
-ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key)
+ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
{
struct ldlm_lock *lock;
- lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
lock->l_remote_handle = *(struct lustre_handle *)key;
}
static int
-ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode)
+ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
{
return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
}
static void *
-ldlm_export_lock_object(cfs_hlist_node_t *hnode)
+ldlm_export_lock_object(struct hlist_node *hnode)
{
- return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
}
static void
-ldlm_export_lock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
{
struct ldlm_lock *lock;
- lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_GET(lock);
}
static void
-ldlm_export_lock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
{
struct ldlm_lock *lock;
- lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_RELEASE(lock);
}
-static cfs_hash_ops_t ldlm_export_lock_ops = {
+static struct cfs_hash_ops ldlm_export_lock_ops = {
.hs_hash = ldlm_export_lock_hash,
.hs_key = ldlm_export_lock_key,
.hs_keycmp = ldlm_export_lock_keycmp,
int ldlm_init_export(struct obd_export *exp)
{
+ int rc;
ENTRY;
exp->exp_lock_hash =
if (!exp->exp_lock_hash)
RETURN(-ENOMEM);
+ rc = ldlm_init_flock_export(exp);
+ if (rc)
+ GOTO(err, rc);
+
RETURN(0);
+err:
+ ldlm_destroy_export(exp);
+ RETURN(rc);
}
EXPORT_SYMBOL(ldlm_init_export);
static int ldlm_setup(void)
{
static struct ptlrpc_service_conf conf;
- struct ldlm_bl_pool *blp = NULL;
- int rc = 0;
-#ifdef __KERNEL__
- int i;
-#endif
+ struct ldlm_bl_pool *blp = NULL;
+#ifdef HAVE_SERVER_SUPPORT
+ struct task_struct *task;
+#endif /* HAVE_SERVER_SUPPORT */
+ int i;
+ int rc = 0;
+
ENTRY;
if (ldlm_state != NULL)
if (ldlm_state == NULL)
RETURN(-ENOMEM);
-#ifdef LPROCFS
+#ifdef CONFIG_PROC_FS
rc = ldlm_proc_setup();
if (rc != 0)
GOTO(out, rc);
-#endif
+#endif /* CONFIG_PROC_FS */
memset(&conf, 0, sizeof(conf));
conf = (typeof(conf)) {
ldlm_state->ldlm_cancel_service = NULL;
GOTO(out, rc);
}
-#endif
+#endif /* HAVE_SERVER_SUPPORT */
OBD_ALLOC(blp, sizeof(*blp));
if (blp == NULL)
ldlm_state->ldlm_bl_pool = blp;
spin_lock_init(&blp->blp_lock);
- CFS_INIT_LIST_HEAD(&blp->blp_list);
- CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
- cfs_waitq_init(&blp->blp_waitq);
- cfs_atomic_set(&blp->blp_num_threads, 0);
- cfs_atomic_set(&blp->blp_busy_threads, 0);
+ INIT_LIST_HEAD(&blp->blp_list);
+ INIT_LIST_HEAD(&blp->blp_prio_list);
+ init_waitqueue_head(&blp->blp_waitq);
+ atomic_set(&blp->blp_num_threads, 0);
+ atomic_set(&blp->blp_busy_threads, 0);
-#ifdef __KERNEL__
if (ldlm_num_threads == 0) {
blp->blp_min_threads = LDLM_NTHRS_INIT;
blp->blp_max_threads = LDLM_NTHRS_MAX;
GOTO(out, rc);
}
-# ifdef HAVE_SERVER_SUPPORT
- CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
+#ifdef HAVE_SERVER_SUPPORT
+ INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
expired_lock_thread.elt_state = ELT_STOPPED;
- cfs_waitq_init(&expired_lock_thread.elt_waitq);
+ init_waitqueue_head(&expired_lock_thread.elt_waitq);
- CFS_INIT_LIST_HEAD(&waiting_locks_list);
+ INIT_LIST_HEAD(&waiting_locks_list);
spin_lock_init(&waiting_locks_spinlock);
- cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
+ cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, NULL);
- rc = PTR_ERR(kthread_run(expired_lock_main, NULL, "ldlm_elt"));
- if (IS_ERR_VALUE(rc)) {
+ task = kthread_run(expired_lock_main, NULL, "ldlm_elt");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
GOTO(out, rc);
}
- cfs_wait_event(expired_lock_thread.elt_waitq,
+ wait_event(expired_lock_thread.elt_waitq,
expired_lock_thread.elt_state == ELT_READY);
-# endif /* HAVE_SERVER_SUPPORT */
+#endif /* HAVE_SERVER_SUPPORT */
rc = ldlm_pools_init();
if (rc) {
CERROR("Failed to initialize LDLM pools: %d\n", rc);
GOTO(out, rc);
}
-#endif
+
+ rc = ldlm_reclaim_setup();
+ if (rc) {
+ CERROR("Failed to setup reclaim thread: rc = %d\n", rc);
+ GOTO(out, rc);
+ }
RETURN(0);
out:
{
ENTRY;
- if (!cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
- !cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
+ if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
+ !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
CERROR("ldlm still has namespaces; clean these up first.\n");
ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
RETURN(-EBUSY);
}
-#ifdef __KERNEL__
- ldlm_pools_fini();
+ ldlm_reclaim_cleanup();
+ ldlm_pools_fini();
if (ldlm_state->ldlm_bl_pool != NULL) {
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
+ while (atomic_read(&blp->blp_num_threads) > 0) {
struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
init_completion(&blp->blp_comp);
spin_lock(&blp->blp_lock);
- cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list);
- cfs_waitq_signal(&blp->blp_waitq);
+ list_add_tail(&blwi.blwi_entry, &blp->blp_list);
+ wake_up(&blp->blp_waitq);
spin_unlock(&blp->blp_lock);
wait_for_completion(&blp->blp_comp);
OBD_FREE(blp, sizeof(*blp));
}
-#endif /* __KERNEL__ */
if (ldlm_state->ldlm_cb_service != NULL)
ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
-# ifdef HAVE_SERVER_SUPPORT
+#ifdef HAVE_SERVER_SUPPORT
if (ldlm_state->ldlm_cancel_service != NULL)
ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
-# endif
+#endif
-#ifdef __KERNEL__
ldlm_proc_cleanup();
-# ifdef HAVE_SERVER_SUPPORT
+#ifdef HAVE_SERVER_SUPPORT
if (expired_lock_thread.elt_state != ELT_STOPPED) {
expired_lock_thread.elt_state = ELT_TERMINATE;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- cfs_wait_event(expired_lock_thread.elt_waitq,
+ wake_up(&expired_lock_thread.elt_waitq);
+ wait_event(expired_lock_thread.elt_waitq,
expired_lock_thread.elt_state == ELT_STOPPED);
}
-# endif
-#endif /* __KERNEL__ */
+#endif
OBD_FREE(ldlm_state, sizeof(*ldlm_state));
ldlm_state = NULL;
mutex_init(&ldlm_ref_mutex);
mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
+
+ INIT_LIST_HEAD(&ldlm_srv_namespace_list);
+ INIT_LIST_HEAD(&ldlm_cli_active_namespace_list);
+ INIT_LIST_HEAD(&ldlm_cli_inactive_namespace_list);
+
ldlm_resource_slab = kmem_cache_create("ldlm_resources",
sizeof(struct ldlm_resource), 0,
SLAB_HWCACHE_ALIGN, NULL);
ldlm_lock_slab = kmem_cache_create("ldlm_locks",
sizeof(struct ldlm_lock), 0,
SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
- if (ldlm_lock_slab == NULL) {
- kmem_cache_destroy(ldlm_resource_slab);
- return -ENOMEM;
- }
+ if (ldlm_lock_slab == NULL)
+ goto out_resource;
ldlm_interval_slab = kmem_cache_create("interval_node",
sizeof(struct ldlm_interval),
0, SLAB_HWCACHE_ALIGN, NULL);
- if (ldlm_interval_slab == NULL) {
- kmem_cache_destroy(ldlm_resource_slab);
- kmem_cache_destroy(ldlm_lock_slab);
- return -ENOMEM;
- }
+ if (ldlm_interval_slab == NULL)
+ goto out_lock;
+
+ ldlm_interval_tree_slab = kmem_cache_create("interval_tree",
+ sizeof(struct ldlm_interval_tree) * LCK_MODE_NUM,
+ 0, SLAB_HWCACHE_ALIGN, NULL);
+ if (ldlm_interval_tree_slab == NULL)
+ goto out_interval;
+
#if LUSTRE_TRACKS_LOCK_EXP_REFS
- class_export_dump_hook = ldlm_dump_export_locks;
+ class_export_dump_hook = ldlm_dump_export_locks;
#endif
- return 0;
+ return 0;
+
+out_interval:
+ kmem_cache_destroy(ldlm_interval_slab);
+out_lock:
+ kmem_cache_destroy(ldlm_lock_slab);
+out_resource:
+ kmem_cache_destroy(ldlm_resource_slab);
+
+ return -ENOMEM;
}
void ldlm_exit(void)
if (ldlm_refcount)
CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
kmem_cache_destroy(ldlm_resource_slab);
-#ifdef __KERNEL__
/* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
* synchronize_rcu() to wait a grace period elapsed, so that
* ldlm_lock_free() get a chance to be called. */
synchronize_rcu();
-#endif
kmem_cache_destroy(ldlm_lock_slab);
kmem_cache_destroy(ldlm_interval_slab);
+ kmem_cache_destroy(ldlm_interval_tree_slab);
}