*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2014, Intel Corporation.
+ * Copyright (c) 2010, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_LDLM
+#include <linux/kthread.h>
+#include <linux/list.h>
#include <libcfs/libcfs.h>
+#include <lustre/lustre_errno.h>
#include <lustre_dlm.h>
#include <obd_class.h>
-#include <libcfs/list.h>
#include "ldlm_internal.h"
static int ldlm_num_threads;
-CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
- "number of DLM service threads to start");
+module_param(ldlm_num_threads, int, 0444);
+MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
static char *ldlm_cpts;
-CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
- "CPU partitions ldlm threads should run on");
+module_param(ldlm_cpts, charp, 0444);
+MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
static struct mutex ldlm_ref_mutex;
static int ldlm_refcount;
-struct ldlm_cb_async_args {
- struct ldlm_cb_set_arg *ca_set_arg;
- struct ldlm_lock *ca_lock;
-};
+struct kobject *ldlm_kobj;
+struct kset *ldlm_ns_kset;
+struct kset *ldlm_svc_kset;
/* LDLM state */
static struct ldlm_state *ldlm_state;
-inline cfs_time_t round_timeout(cfs_time_t timeout)
+static inline cfs_time_t round_timeout(cfs_time_t timeout)
{
return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
}
};
struct ldlm_bl_work_item {
- struct list_head blwi_entry;
- struct ldlm_namespace *blwi_ns;
- struct ldlm_lock_desc blwi_ld;
- struct ldlm_lock *blwi_lock;
- struct list_head blwi_head;
- int blwi_count;
- struct completion blwi_comp;
- ldlm_cancel_flags_t blwi_flags;
- int blwi_mem_pressure;
+ struct list_head blwi_entry;
+ struct ldlm_namespace *blwi_ns;
+ struct ldlm_lock_desc blwi_ld;
+ struct ldlm_lock *blwi_lock;
+ struct list_head blwi_head;
+ int blwi_count;
+ struct completion blwi_comp;
+ enum ldlm_cancel_flags blwi_flags;
+ int blwi_mem_pressure;
};
#ifdef HAVE_SERVER_SUPPORT
spin_lock_bh(&waiting_locks_spinlock);
if (expired_lock_thread.elt_dump) {
- struct libcfs_debug_msg_data msgdata = {
- .msg_file = __FILE__,
- .msg_fn = "waiting_locks_callback",
- .msg_line = expired_lock_thread.elt_dump };
spin_unlock_bh(&waiting_locks_spinlock);
/* from waiting_locks_callback, but not in timer */
libcfs_debug_dumplog();
- libcfs_run_lbug_upcall(&msgdata);
spin_lock_bh(&waiting_locks_spinlock);
expired_lock_thread.elt_dump = 0;
lock = list_entry(expired->next, struct ldlm_lock,
l_pending_chain);
- if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE &&
+ if ((void *)lock < LP_POISON + PAGE_SIZE &&
(void *)lock >= LP_POISON) {
spin_unlock_bh(&waiting_locks_spinlock);
CERROR("free lock on elt list %p\n", lock);
}
list_del_init(&lock->l_pending_chain);
if ((void *)lock->l_export <
- LP_POISON + PAGE_CACHE_SIZE &&
+ LP_POISON + PAGE_SIZE &&
(void *)lock->l_export >= LP_POISON) {
CERROR("lock with free export on elt list %p\n",
lock->l_export);
continue;
}
ldlm_lock_to_ns(lock)->ns_timeouts++;
- LDLM_ERROR(lock, "lock callback timer expired after %lds: "
+ LDLM_ERROR(lock, "lock callback timer expired after %llds: "
"evicting client at %s ",
- cfs_time_current_sec() - lock->l_last_activity,
+ ktime_get_real_seconds() - lock->l_last_activity,
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
- cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
+ mod_timer(&waiting_locks_timer, timeout_rounded);
}
spin_unlock_bh(&waiting_locks_spinlock);
}
timeout_rounded = round_timeout(lock->l_callback_timeout);
- if (cfs_time_before(timeout_rounded,
- cfs_timer_deadline(&waiting_locks_timer)) ||
- !cfs_timer_is_armed(&waiting_locks_timer)) {
- cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
+ if (cfs_time_before(timeout_rounded, waiting_locks_timer.expires) ||
+ !timer_pending(&waiting_locks_timer)) {
+ mod_timer(&waiting_locks_timer, timeout_rounded);
}
/* if the new lock has a shorter timeout than something earlier on
the list, we'll wait the longer amount of time; no big deal. */
return 1;
}
+static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
+{
+ spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ if (list_empty(&lock->l_exp_list)) {
+ if (lock->l_granted_mode != lock->l_req_mode)
+ list_add_tail(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ else
+ list_add(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ }
+ spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+ /* A blocked lock is added. Adjust the position in
+ * the stale list if the export is in the list.
+ * If export is stale and not in the list - it is being
+ * processed and will be placed on the right position
+ * on obd_stale_export_put(). */
+ if (!list_empty(&lock->l_export->exp_stale_list))
+ obd_stale_export_adjust(lock->l_export);
+}
+
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
int ret;
/* NB: must be called with hold of lock_res_and_lock() */
LASSERT(ldlm_is_res_locked(lock));
- ldlm_set_waited(lock);
-
LASSERT(!ldlm_is_cancel_on_block(lock));
+ /* Do not put cross-MDT lock in the waiting list, since we
+ * will not evict it due to timeout for now */
+ if (lock->l_export != NULL &&
+ (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS))
+ return 0;
+
spin_lock_bh(&waiting_locks_spinlock);
+ if (ldlm_is_cancel(lock)) {
+ spin_unlock_bh(&waiting_locks_spinlock);
+ return 0;
+ }
+
if (ldlm_is_destroyed(lock)) {
static cfs_time_t next;
+
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
if (cfs_time_after(cfs_time_current(), next)) {
return 0;
}
- lock->l_last_activity = cfs_time_current_sec();
+ ldlm_set_waited(lock);
+ lock->l_last_activity = ktime_get_real_seconds();
ret = __ldlm_add_waiting_lock(lock, timeout);
if (ret) {
/* grab ref on the lock if it has been added to the
}
spin_unlock_bh(&waiting_locks_spinlock);
- if (ret) {
- spin_lock_bh(&lock->l_export->exp_bl_list_lock);
- if (list_empty(&lock->l_exp_list))
- list_add(&lock->l_exp_list,
- &lock->l_export->exp_bl_list);
- spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
- }
+ if (ret)
+ ldlm_add_blocked_lock(lock);
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
/* Removing the head of the list, adjust timer. */
if (list_next == &waiting_locks_list) {
/* No more, just cancel. */
- cfs_timer_disarm(&waiting_locks_timer);
+ del_timer(&waiting_locks_timer);
} else {
struct ldlm_lock *next;
next = list_entry(list_next, struct ldlm_lock,
l_pending_chain);
- cfs_timer_arm(&waiting_locks_timer,
- round_timeout(next->l_callback_timeout));
+ mod_timer(&waiting_locks_timer,
+ round_timeout(next->l_callback_timeout));
}
}
list_del_init(&lock->l_pending_chain);
spin_lock_bh(&waiting_locks_spinlock);
ret = __ldlm_del_waiting_lock(lock);
+ ldlm_clear_waited(lock);
spin_unlock_bh(&waiting_locks_spinlock);
/* remove the lock out of export blocking list */
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
}
-EXPORT_SYMBOL(ldlm_del_waiting_lock);
/**
* Prolong the contended lock waiting time.
return 0;
}
+ if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) {
+ /* We don't have a "waiting locks list" on OSP. */
+ LDLM_DEBUG(lock, "MDS-MDS lock: no-op");
+ return 0;
+ }
+
spin_lock_bh(&waiting_locks_spinlock);
if (list_empty(&lock->l_pending_chain)) {
* Perform lock cleanup if AST reply came with error.
*/
static int ldlm_handle_ast_error(struct ldlm_lock *lock,
- struct ptlrpc_request *req, int rc,
- const char *ast_type)
+ struct ptlrpc_request *req, int rc,
+ const char *ast_type)
{
- lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
+ struct lnet_process_id peer = req->rq_import->imp_connection->c_peer;
if (!req->rq_replied || (rc && rc != -EINVAL)) {
if (lock->l_export && lock->l_export->exp_libclient) {
- LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
- " timeout, just cancelling lock", ast_type,
+ LDLM_DEBUG(lock,
+ "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock",
+ ast_type, req, req->rq_xid,
libcfs_nid2str(peer.nid));
ldlm_lock_cancel(lock);
rc = -ERESTART;
} else if (ldlm_is_cancel(lock)) {
- LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
- "cancel was received (AST reply lost?)",
- ast_type, libcfs_nid2str(peer.nid));
+ LDLM_DEBUG(lock,
+ "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
+ ast_type, req, req->rq_xid,
+ libcfs_nid2str(peer.nid));
ldlm_lock_cancel(lock);
rc = -ERESTART;
+ } else if (rc == -ENODEV || rc == -ESHUTDOWN ||
+ (rc == -EIO &&
+ req->rq_import->imp_state == LUSTRE_IMP_CLOSED)) {
+ /* Upon umount process the AST fails because cannot be
+ * sent. This shouldn't lead to the client eviction.
+ * -ENODEV error is returned by ptl_send_rpc() for
+ * new request in such import.
+ * -SHUTDOWN is returned by ptlrpc_import_delay_req()
+ * if imp_invalid is set or obd_no_recov.
+ * Meanwhile there is also check for LUSTRE_IMP_CLOSED
+ * in ptlrpc_import_delay_req() as well with -EIO code.
+ * In all such cases errors are ignored.
+ */
+ LDLM_DEBUG(lock, "%s AST can't be sent due to a server"
+ " %s failure or umount process: rc = %d\n",
+ ast_type,
+ req->rq_import->imp_obd->obd_name, rc);
} else {
- LDLM_ERROR(lock, "client (nid %s) %s %s AST "
- "(req status %d rc %d), evict it",
+ LDLM_ERROR(lock,
+ "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
libcfs_nid2str(peer.nid),
req->rq_replied ? "returned error from" :
"failed to reply to",
- ast_type,
+ ast_type, req, req->rq_xid,
(req->rq_repmsg != NULL) ?
lustre_msg_get_status(req->rq_repmsg) : 0,
rc);
if (rc == -EINVAL) {
struct ldlm_resource *res = lock->l_resource;
- LDLM_DEBUG(lock, "client (nid %s) returned %d"
- " from %s AST - normal race",
+ LDLM_DEBUG(lock,
+ "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
libcfs_nid2str(peer.nid),
req->rq_repmsg ?
lustre_msg_get_status(req->rq_repmsg) : -1,
- ast_type);
+ ast_type, req, req->rq_xid);
if (res) {
/* update lvbo to return proper attributes.
* see bug 23174 */
* - Glimpse callback of remote lock might return
* -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
*/
- if (rc == -ELDLM_NO_LOCK_DATA) {
+ if (unlikely(arg->gl_interpret_reply)) {
+ rc = arg->gl_interpret_reply(env, req, data, rc);
+ } else if (rc == -ELDLM_NO_LOCK_DATA) {
LDLM_DEBUG(lock, "lost race - client has a lock but no "
"inode");
ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
/* Don't need to do anything here. */
RETURN(0);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
+ LDLM_DEBUG(lock, "dropping BL AST");
+ RETURN(0);
+ }
+
LASSERT(lock);
LASSERT(data != NULL);
if (lock->l_export->exp_obd->obd_recovering != 0)
req->rq_interpret_reply = ldlm_cb_interpret;
lock_res_and_lock(lock);
- if (lock->l_granted_mode != lock->l_req_mode) {
- /* this blocking AST will be communicated as part of the
- * completion AST instead */
+ if (ldlm_is_destroyed(lock)) {
+ /* What's the point? */
unlock_res_and_lock(lock);
-
ptlrpc_req_finished(req);
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
- if (ldlm_is_destroyed(lock)) {
- /* What's the point? */
+ if (lock->l_granted_mode != lock->l_req_mode) {
+ /* this blocking AST will be communicated as part of the
+ * completion AST instead */
+ ldlm_add_blocked_lock(lock);
+ ldlm_set_waited(lock);
unlock_res_and_lock(lock);
+
ptlrpc_req_finished(req);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
LDLM_DEBUG(lock, "server preparing blocking AST");
ptlrpc_request_set_replen(req);
+ ldlm_set_cbpending(lock);
if (instant_cancel) {
unlock_res_and_lock(lock);
ldlm_lock_cancel(lock);
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
- lock->l_last_activity = cfs_time_current_sec();
+ lock->l_last_activity = ktime_get_real_seconds();
if (lock->l_export && lock->l_export->exp_nid_stats &&
lock->l_export->exp_nid_stats->nid_ldlm_stats)
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_server_blocking_ast);
/**
* ->l_completion_ast callback for a remote lock in server namespace.
LASSERT(lock != NULL);
LASSERT(data != NULL);
- if (OBD_FAIL_PRECHECK(OBD_FAIL_OST_LDLM_REPLY_NET)) {
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
LDLM_DEBUG(lock, "dropping CP AST");
RETURN(0);
}
}
}
- lock->l_last_activity = cfs_time_current_sec();
+ lock->l_last_activity = ktime_get_real_seconds();
LDLM_DEBUG(lock, "server preparing completion AST");
RETURN(lvb_len < 0 ? lvb_len : rc);
}
-EXPORT_SYMBOL(ldlm_server_completion_ast);
/**
* Server side ->l_glimpse_ast handler for client locks.
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
- lock->l_last_activity = cfs_time_current_sec();
+ lock->l_last_activity = ktime_get_real_seconds();
req->rq_interpret_reply = ldlm_cb_interpret;
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_server_glimpse_ast);
int ldlm_glimpse_locks(struct ldlm_resource *res,
struct list_head *gl_work_list)
* service threads to carry out client lock enqueueing requests.
*/
int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
- struct ptlrpc_request *req,
- const struct ldlm_request *dlm_req,
- const struct ldlm_callback_suite *cbs)
+ struct ptlrpc_request *req,
+ const struct ldlm_request *dlm_req,
+ const struct ldlm_callback_suite *cbs)
{
- struct ldlm_reply *dlm_rep;
+ struct ldlm_reply *dlm_rep;
__u64 flags;
- ldlm_error_t err = ELDLM_OK;
- struct ldlm_lock *lock = NULL;
- void *cookie = NULL;
- int rc = 0;
+ enum ldlm_error err = ELDLM_OK;
+ struct ldlm_lock *lock = NULL;
+ void *cookie = NULL;
+ int rc = 0;
struct ldlm_resource *res = NULL;
- ENTRY;
+ ENTRY;
- LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
+ LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
flags = ldlm_flags_from_wire(dlm_req->lock_flags);
- LASSERT(req->rq_export);
+ LASSERT(req->rq_export);
if (ptlrpc_req2svc(req)->srv_stats != NULL)
ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
GOTO(out, rc = -EPROTO);
}
-#if 0
- /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
- against server's _CONNECT_SUPPORTED flags? (I don't want to use
- ibits for mgc/mgs) */
-
- /* INODEBITS_INTEROP: Perform conversion from plain lock to
- * inodebits lock if client does not support them. */
- if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) &&
- (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
- dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
- dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
- if (dlm_req->lock_desc.l_req_mode == LCK_PR)
- dlm_req->lock_desc.l_req_mode = LCK_CR;
- }
-#endif
-
if (unlikely((flags & LDLM_FL_REPLAY) ||
(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
/* Find an existing lock in the per-export lock hash */
lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
(void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
- DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
- LPX64, lock->l_handle.h_cookie);
+ DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx",
+ lock->l_handle.h_cookie);
flags |= LDLM_FL_RESENT;
GOTO(existing_lock, rc = 0);
}
- }
+ } else {
+ if (ldlm_reclaim_full()) {
+ DEBUG_REQ(D_DLMTRACE, req, "Too many granted locks, "
+ "reject current enqueue request and let the "
+ "client retry later.\n");
+ GOTO(out, rc = -EINPROGRESS);
+ }
+ }
/* The lock's callback data might be set in the policy function */
lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
* without them. */
lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
LDLM_FL_INHERIT_MASK);
+
+ ldlm_convert_policy_to_local(req->rq_export,
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+ lock->l_req_extent = lock->l_policy_data.l_extent;
+
existing_lock:
if (flags & LDLM_FL_HAS_INTENT) {
GOTO(out, rc);
}
- if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
- ldlm_convert_policy_to_local(req->rq_export,
- dlm_req->lock_desc.l_resource.lr_type,
- &dlm_req->lock_desc.l_policy_data,
- &lock->l_policy_data);
- if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
- lock->l_req_extent = lock->l_policy_data.l_extent;
-
err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
if (err) {
if ((int)err < 0)
ldlm_lock2desc(lock, &dlm_rep->lock_desc);
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
+ if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
+
/* We never send a blocking AST until the lock is granted, but
* we can tell it right now */
lock_res_and_lock(lock);
if (unlikely(!ldlm_is_cancel_on_block(lock) ||
!(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
CERROR("Granting sync lock to libclient. "
- "req fl %d, rep fl %d, lock fl "LPX64"\n",
+ "req fl %d, rep fl %d, lock fl %#llx\n",
dlm_req->lock_flags, dlm_rep->lock_flags,
lock->l_flags);
LDLM_ERROR(lock, "sync lock");
- if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
- struct ldlm_intent *it;
-
- it = req_capsule_client_get(&req->rq_pill,
- &RMF_LDLM_INTENT);
- if (it != NULL) {
- CERROR("This is intent %s ("LPU64")\n",
- ldlm_it2str(it->opc), it->opc);
- }
- }
+ if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
+ struct ldlm_intent *it;
+
+ it = req_capsule_client_get(&req->rq_pill,
+ &RMF_LDLM_INTENT);
+ if (it != NULL) {
+ CERROR("This is intent %s (%llu)\n",
+ ldlm_it2str(it->opc), it->opc);
+ }
+ }
}
}
}
}
- if (rc != 0) {
- lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- ldlm_lock_destroy_nolock(lock);
- unlock_res_and_lock(lock);
- }
+ if (rc != 0 && !(flags & LDLM_FL_RESENT)) {
+ if (lock->l_export) {
+ ldlm_lock_cancel(lock);
+ } else {
+ lock_res_and_lock(lock);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_destroy_nolock(lock);
+ unlock_res_and_lock(lock);
+
+ }
+ }
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
return rc;
}
-EXPORT_SYMBOL(ldlm_handle_enqueue0);
/**
* Old-style LDLM main entry point for server code enqueue.
}
return rc;
}
-EXPORT_SYMBOL(ldlm_handle_enqueue);
/**
* Main LDLM entry point for server code to process lock conversion requests.
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_handle_convert0);
/**
* Old-style main LDLM entry point for server code to process lock conversion
}
return rc;
}
-EXPORT_SYMBOL(ldlm_handle_convert);
/**
* Cancel all the locks whose handles are packed into ldlm_request
lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
if (!lock) {
LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
- "lock (cookie "LPU64")",
+ "lock (cookie %llu)",
dlm_req->lock_handle[i].cookie);
continue;
}
}
if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock)) {
- long delay = cfs_time_sub(cfs_time_current_sec(),
- lock->l_last_activity);
- LDLM_DEBUG(lock, "server cancels blocked lock after "
- CFS_DURATION_T"s", delay);
+ time64_t delay = ktime_get_real_seconds() -
+ lock->l_last_activity;
+ LDLM_DEBUG(lock, "server cancels blocked lock after %llds",
+ (s64)delay);
at_measured(&lock->l_export->exp_bl_lock_at, delay);
}
ldlm_lock_cancel(lock);
RETURN(ptlrpc_reply(req));
}
-EXPORT_SYMBOL(ldlm_handle_cancel);
#endif /* HAVE_SERVER_SUPPORT */
/**
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
int to = cfs_time_seconds(1);
while (to > 0) {
- schedule_timeout_and_set_state(
- TASK_INTERRUPTIBLE, to);
+ set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(to);
if (lock->l_granted_mode == lock->l_req_mode ||
ldlm_is_destroyed(lock))
break;
lock->l_lvb_len, lvb_len);
GOTO(out, rc = -EINVAL);
}
- } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
- * variable length */
- void *lvb_data;
-
- OBD_ALLOC_LARGE(lvb_data, lvb_len);
- if (lvb_data == NULL) {
- LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
- GOTO(out, rc = -ENOMEM);
- }
-
- lock_res_and_lock(lock);
- LASSERT(lock->l_lvb_data == NULL);
- lock->l_lvb_type = LVB_T_LAYOUT;
- lock->l_lvb_data = lvb_data;
- lock->l_lvb_len = lvb_len;
- unlock_res_and_lock(lock);
}
}
}
static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
- ldlm_cancel_flags_t cancel_flags)
+ enum ldlm_cancel_flags cancel_flags)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
ENTRY;
struct ldlm_lock_desc *ld,
struct list_head *cancels, int count,
struct ldlm_lock *lock,
- ldlm_cancel_flags_t cancel_flags)
+ enum ldlm_cancel_flags cancel_flags)
{
init_completion(&blwi->blwi_comp);
INIT_LIST_HEAD(&blwi->blwi_head);
struct ldlm_lock_desc *ld,
struct ldlm_lock *lock,
struct list_head *cancels, int count,
- ldlm_cancel_flags_t cancel_flags)
+ enum ldlm_cancel_flags cancel_flags)
{
ENTRY;
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct list_head *cancels, int count,
- ldlm_cancel_flags_t cancel_flags)
+ enum ldlm_cancel_flags cancel_flags)
{
return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
}
+int ldlm_bl_thread_wakeup(void)
+{
+ wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
+ return 0;
+}
+
/* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
static int ldlm_handle_setinfo(struct ptlrpc_request *req)
{
}
static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
- const char *msg, int rc,
- struct lustre_handle *handle)
+ const char *msg, int rc,
+ const struct lustre_handle *handle)
{
DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
- "%s: [nid %s] [rc %d] [lock "LPX64"]",
+ "%s: [nid %s] [rc %d] [lock %#llx]",
msg, libcfs_id2str(req->rq_peer), rc,
handle ? handle->cookie : 0);
if (req->rq_no_reply)
CWARN("Send reply failed, maybe cause bug 21636.\n");
}
-static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
-{
- struct obd_quotactl *oqctl;
- struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
-
- oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL) {
- CERROR("Can't unpack obd_quotactl\n");
- RETURN(-EPROTO);
- }
-
- oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
-
- cli->cl_qchk_stat = oqctl->qc_stat;
- return 0;
-}
-
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
rc = llog_origin_handle_close(req);
ldlm_callback_reply(req, rc);
RETURN(0);
- case OBD_QC_CALLBACK:
- req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
- if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
- RETURN(0);
- rc = ldlm_handle_qc_callback(req);
- ldlm_callback_reply(req, rc);
- RETURN(0);
default:
CERROR("unknown opcode %u\n",
lustre_msg_get_opc(req->rq_reqmsg));
lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
if (!lock) {
- CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
+ CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock "
"disappeared\n", dlm_req->lock_handle[0].cookie);
rc = ldlm_callback_reply(req, -EINVAL);
ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
lock_res_and_lock(lock);
lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
LDLM_FL_AST_MASK);
- if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- /* If somebody cancels lock and cache is already dropped,
- * or lock is failed before cp_ast received on client,
- * we can tell the server we have no lock. Otherwise, we
- * should send cancel after dropping the cache. */
+ if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
+ /* If somebody cancels lock and cache is already dropped,
+ * or lock is failed before cp_ast received on client,
+ * we can tell the server we have no lock. Otherwise, we
+ * should send cancel after dropping the cache. */
if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
- ldlm_is_failed(lock)) {
- LDLM_DEBUG(lock, "callback on lock "
- LPX64" - lock disappeared\n",
- dlm_req->lock_handle[0].cookie);
- unlock_res_and_lock(lock);
- LDLM_LOCK_RELEASE(lock);
- rc = ldlm_callback_reply(req, -EINVAL);
- ldlm_callback_errmsg(req, "Operate on stale lock", rc,
- &dlm_req->lock_handle[0]);
- RETURN(0);
- }
+ ldlm_is_failed(lock)) {
+ LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared",
+ dlm_req->lock_handle[0].cookie);
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+ &dlm_req->lock_handle[0]);
+ RETURN(0);
+ }
/* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast. */
- ldlm_lock_remove_from_lru(lock);
+ ldlm_lock_remove_from_lru(lock);
ldlm_set_bl_ast(lock);
- }
+ }
unlock_res_and_lock(lock);
/* We want the ost thread to get this reply so that it can respond
struct ldlm_request *dlm_req;
CERROR("%s from %s arrived at %lu with bad export cookie "
- LPU64"\n",
+ "%llu\n",
ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
libcfs_nid2str(req->rq_peer.nid),
req->rq_arrival_time.tv_sec,
req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
CDEBUG(D_INODE, "cancel\n");
if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
- CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))
+ CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
+ CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
RETURN(0);
rc = ldlm_handle_cancel(req);
if (rc)
if (lustre_handle_equal(&dlm_req->lock_handle[i],
&lockh)) {
DEBUG_REQ(D_RPCTRACE, req,
- "Prio raised by lock "LPX64".", lockh.cookie);
+ "Prio raised by lock %#llx.", lockh.cookie);
rc = 1;
break;
RETURN(0);
}
-static int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
struct hlist_node *hnode, void *data)
{
void ldlm_revoke_export_locks(struct obd_export *exp)
{
struct list_head rpc_list;
- ENTRY;
+ ENTRY;
INIT_LIST_HEAD(&rpc_list);
- cfs_hash_for_each_empty(exp->exp_lock_hash,
- ldlm_revoke_lock_cb, &rpc_list);
- ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
- LDLM_WORK_REVOKE_AST);
+ cfs_hash_for_each_nolock(exp->exp_lock_hash,
+ ldlm_revoke_lock_cb, &rpc_list, 0);
+ ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+ LDLM_WORK_REVOKE_AST);
- EXIT;
+ EXIT;
}
EXPORT_SYMBOL(ldlm_revoke_export_locks);
#endif /* HAVE_SERVER_SUPPORT */
-static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
+static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item **p_blwi,
+ struct obd_export **p_exp)
{
struct ldlm_bl_work_item *blwi = NULL;
static unsigned int num_bl = 0;
+ static unsigned int num_stale;
+ int num_th = atomic_read(&blp->blp_num_threads);
+
+ *p_exp = obd_stale_export_get();
spin_lock(&blp->blp_lock);
+ if (*p_exp != NULL) {
+ if (num_th == 1 || ++num_stale < num_th) {
+ spin_unlock(&blp->blp_lock);
+ return 1;
+ } else {
+ num_stale = 0;
+ }
+ }
+
/* process a request from the blp_list at least every blp_num_threads */
if (!list_empty(&blp->blp_list) &&
(list_empty(&blp->blp_prio_list) || num_bl == 0))
blwi_entry);
if (blwi) {
- if (++num_bl >= atomic_read(&blp->blp_num_threads))
+ if (++num_bl >= num_th)
num_bl = 0;
list_del(&blwi->blwi_entry);
}
spin_unlock(&blp->blp_lock);
+ *p_blwi = blwi;
- return blwi;
+ if (*p_exp != NULL && *p_blwi != NULL) {
+ obd_stale_export_put(*p_exp);
+ *p_exp = NULL;
+ }
+
+ return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0;
}
/* This only contains temporary data until the thread starts */
struct ldlm_bl_thread_data {
- char bltd_name[CFS_CURPROC_COMM_MAX];
struct ldlm_bl_pool *bltd_blp;
struct completion bltd_comp;
int bltd_num;
static int ldlm_bl_thread_main(void *arg);
-static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
+static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy)
{
struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
struct task_struct *task;
init_completion(&bltd.bltd_comp);
- bltd.bltd_num = atomic_read(&blp->blp_num_threads);
- snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1,
- "ldlm_bl_%02d", bltd.bltd_num);
- task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name);
+
+ bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads);
+ if (bltd.bltd_num >= blp->blp_max_threads) {
+ atomic_dec(&blp->blp_num_threads);
+ return 0;
+ }
+
+ LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num);
+ if (check_busy &&
+ atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) {
+ atomic_dec(&blp->blp_num_threads);
+ return 0;
+ }
+
+ task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d",
+ bltd.bltd_num);
if (IS_ERR(task)) {
CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
- atomic_read(&blp->blp_num_threads), PTR_ERR(task));
+ bltd.bltd_num, PTR_ERR(task));
+ atomic_dec(&blp->blp_num_threads);
return PTR_ERR(task);
}
wait_for_completion(&bltd.bltd_comp);
return 0;
}
+/* Not fatal if racy and have a few too many threads */
+static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads)
+ return 0;
+
+ if (atomic_read(&blp->blp_busy_threads) <
+ atomic_read(&blp->blp_num_threads))
+ return 0;
+
+ if (blwi != NULL && (blwi->blwi_ns == NULL ||
+ blwi->blwi_mem_pressure))
+ return 0;
+
+ return 1;
+}
+
+static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ ENTRY;
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ RETURN(LDLM_ITER_STOP);
+
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_set();
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+
+ if (blwi->blwi_count) {
+ int count;
+ /* The special case when we cancel locks in lru
+ * asynchronously, we pass the list of locks here.
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count,
+ LCF_BL_AST);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+ blwi->blwi_flags);
+ } else {
+ ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
+ blwi->blwi_lock);
+ }
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_clr();
+
+ if (blwi->blwi_flags & LCF_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ complete(&blwi->blwi_comp);
+
+ RETURN(0);
+}
+
+/**
+ * Cancel stale locks on export. Cancel blocked locks first.
+ * If the given export has blocked locks, the next in the list may have
+ * them too, thus cancel not blocked locks only if the current export has
+ * no blocked locks.
+ **/
+static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
+ struct obd_export *exp)
+{
+ int num;
+ ENTRY;
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
+
+ num = ldlm_export_cancel_blocked_locks(exp);
+ if (num == 0)
+ ldlm_export_cancel_locks(exp);
+
+ obd_stale_export_put(exp);
+
+ RETURN(0);
+}
+
+
/**
* Main blocking requests processing thread.
*
blp = bltd->bltd_blp;
- atomic_inc(&blp->blp_num_threads);
- atomic_inc(&blp->blp_busy_threads);
-
complete(&bltd->bltd_comp);
/* cannot use bltd after this, it is only on caller's stack */
while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
- int busy;
+ struct obd_export *exp = NULL;
+ int rc;
- blwi = ldlm_bl_get_work(blp);
+ rc = ldlm_bl_get_work(blp, &blwi, &exp);
- if (blwi == NULL) {
- atomic_dec(&blp->blp_busy_threads);
+ if (rc == 0)
l_wait_event_exclusive(blp->blp_waitq,
- (blwi = ldlm_bl_get_work(blp)) != NULL,
- &lwi);
- busy = atomic_inc_return(&blp->blp_busy_threads);
- } else {
- busy = atomic_read(&blp->blp_busy_threads);
- }
-
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
+ ldlm_bl_get_work(blp, &blwi,
+ &exp),
+ &lwi);
+ atomic_inc(&blp->blp_busy_threads);
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= atomic_read(&blp->blp_num_threads) &&
- !blwi->blwi_mem_pressure))
+ if (ldlm_bl_thread_need_create(blp, blwi))
/* discard the return value, we tried */
- ldlm_bl_thread_start(blp);
-
- if (blwi->blwi_mem_pressure)
- memory_pressure_set();
-
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
-
- if (blwi->blwi_count) {
- int count;
- /* The special case when we cancel locks in LRU
- * asynchronously, we pass the list of locks here.
- * Thus locks are marked LDLM_FL_CANCELING, but NOT
- * canceled locally yet. */
- count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
- blwi->blwi_count,
- LCF_BL_AST);
- ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
- blwi->blwi_flags);
- } else {
- ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
- blwi->blwi_lock);
- }
- if (blwi->blwi_mem_pressure)
- memory_pressure_clr();
+ ldlm_bl_thread_start(blp, true);
- if (blwi->blwi_flags & LCF_ASYNC)
- OBD_FREE(blwi, sizeof(*blwi));
- else
- complete(&blwi->blwi_comp);
+ if (exp)
+ rc = ldlm_bl_thread_exports(blp, exp);
+ else if (blwi)
+ rc = ldlm_bl_thread_blwi(blp, blwi);
+
+ atomic_dec(&blp->blp_busy_threads);
+
+ if (rc == LDLM_ITER_STOP)
+ break;
+
+ /* If there are many namespaces, we will not sleep waiting for
+ * work, and must do a cond_resched to avoid holding the CPU
+ * for too long */
+ cond_resched();
}
- atomic_dec(&blp->blp_busy_threads);
atomic_dec(&blp->blp_num_threads);
complete(&blp->blp_comp);
RETURN(0);
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_get_ref);
void ldlm_put_ref(void)
{
EXIT;
}
-EXPORT_SYMBOL(ldlm_put_ref);
/*
* Export handle<->lock hash operations.
*/
static unsigned
-ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
+ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
{
return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
}
}
static void
-ldlm_export_lock_get(cfs_hash_t *hs, struct hlist_node *hnode)
+ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
{
struct ldlm_lock *lock;
}
static void
-ldlm_export_lock_put(cfs_hash_t *hs, struct hlist_node *hnode)
+ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
{
struct ldlm_lock *lock;
LDLM_LOCK_RELEASE(lock);
}
-static cfs_hash_ops_t ldlm_export_lock_ops = {
+static struct cfs_hash_ops ldlm_export_lock_ops = {
.hs_hash = ldlm_export_lock_hash,
.hs_key = ldlm_export_lock_key,
.hs_keycmp = ldlm_export_lock_keycmp,
}
EXPORT_SYMBOL(ldlm_destroy_export);
+static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
+ struct attribute *attr,
+ char *buf)
+{
+ return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
+}
+
+static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
+ struct attribute *attr,
+ const char *buffer,
+ size_t count)
+{
+ int rc;
+ unsigned long val;
+
+ rc = kstrtoul(buffer, 10, &val);
+ if (rc)
+ return rc;
+
+ ldlm_cancel_unused_locks_before_replay = val;
+
+ return count;
+}
+LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
+
+static struct attribute *ldlm_attrs[] = {
+ &lustre_attr_cancel_unused_locks_before_replay.attr,
+ NULL,
+};
+
+static struct attribute_group ldlm_attr_group = {
+ .attrs = ldlm_attrs,
+};
+
static int ldlm_setup(void)
{
static struct ptlrpc_service_conf conf;
if (ldlm_state == NULL)
RETURN(-ENOMEM);
+ ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
+ if (!ldlm_kobj)
+ GOTO(out, -ENOMEM);
+
+ rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
+ if (rc)
+ GOTO(out, rc);
+
+ ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
+ if (!ldlm_ns_kset)
+ GOTO(out, -ENOMEM);
+
+ ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
+ if (!ldlm_svc_kset)
+ GOTO(out, -ENOMEM);
+
#ifdef CONFIG_PROC_FS
- rc = ldlm_proc_setup();
- if (rc != 0)
+ rc = ldlm_proc_setup();
+ if (rc != 0)
GOTO(out, rc);
#endif /* CONFIG_PROC_FS */
}
for (i = 0; i < blp->blp_min_threads; i++) {
- rc = ldlm_bl_thread_start(blp);
+ rc = ldlm_bl_thread_start(blp, false);
if (rc < 0)
GOTO(out, rc);
}
INIT_LIST_HEAD(&waiting_locks_list);
spin_lock_init(&waiting_locks_spinlock);
- cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, NULL);
+ setup_timer(&waiting_locks_timer, waiting_locks_callback, 0);
task = kthread_run(expired_lock_main, NULL, "ldlm_elt");
if (IS_ERR(task)) {
CERROR("Failed to initialize LDLM pools: %d\n", rc);
GOTO(out, rc);
}
+
+ rc = ldlm_reclaim_setup();
+ if (rc) {
+ CERROR("Failed to setup reclaim thread: rc = %d\n", rc);
+ GOTO(out, rc);
+ }
RETURN(0);
out:
RETURN(-EBUSY);
}
- ldlm_pools_fini();
+ ldlm_reclaim_cleanup();
+ ldlm_pools_fini();
if (ldlm_state->ldlm_bl_pool != NULL) {
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
#endif
+ if (ldlm_ns_kset)
+ kset_unregister(ldlm_ns_kset);
+ if (ldlm_svc_kset)
+ kset_unregister(ldlm_svc_kset);
+ if (ldlm_kobj)
+ kobject_put(ldlm_kobj);
+
ldlm_proc_cleanup();
#ifdef HAVE_SERVER_SUPPORT
ldlm_lock_slab = kmem_cache_create("ldlm_locks",
sizeof(struct ldlm_lock), 0,
SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
- if (ldlm_lock_slab == NULL) {
- kmem_cache_destroy(ldlm_resource_slab);
- return -ENOMEM;
- }
+ if (ldlm_lock_slab == NULL)
+ goto out_resource;
ldlm_interval_slab = kmem_cache_create("interval_node",
sizeof(struct ldlm_interval),
0, SLAB_HWCACHE_ALIGN, NULL);
- if (ldlm_interval_slab == NULL) {
- kmem_cache_destroy(ldlm_resource_slab);
- kmem_cache_destroy(ldlm_lock_slab);
- return -ENOMEM;
- }
+ if (ldlm_interval_slab == NULL)
+ goto out_lock;
+
+ ldlm_interval_tree_slab = kmem_cache_create("interval_tree",
+ sizeof(struct ldlm_interval_tree) * LCK_MODE_NUM,
+ 0, SLAB_HWCACHE_ALIGN, NULL);
+ if (ldlm_interval_tree_slab == NULL)
+ goto out_interval;
+
#if LUSTRE_TRACKS_LOCK_EXP_REFS
- class_export_dump_hook = ldlm_dump_export_locks;
+ class_export_dump_hook = ldlm_dump_export_locks;
#endif
- return 0;
+ return 0;
+
+out_interval:
+ kmem_cache_destroy(ldlm_interval_slab);
+out_lock:
+ kmem_cache_destroy(ldlm_lock_slab);
+out_resource:
+ kmem_cache_destroy(ldlm_resource_slab);
+
+ return -ENOMEM;
}
void ldlm_exit(void)
synchronize_rcu();
kmem_cache_destroy(ldlm_lock_slab);
kmem_cache_destroy(ldlm_interval_slab);
+ kmem_cache_destroy(ldlm_interval_tree_slab);
}