-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Author: Phil Schwan <phil@clusterfs.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
#include <libcfs/list.h>
#include "ldlm_internal.h"
-#ifdef __KERNEL__
static int ldlm_num_threads;
CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
"number of DLM service threads to start");
-#endif
+
+static char *ldlm_cpts;
+CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
+ "CPU partitions ldlm threads should run on");
extern cfs_mem_cache_t *ldlm_resource_slab;
extern cfs_mem_cache_t *ldlm_lock_slab;
-static struct semaphore ldlm_ref_sem;
+static cfs_mutex_t ldlm_ref_mutex;
static int ldlm_refcount;
+struct ldlm_cb_async_args {
+ struct ldlm_cb_set_arg *ca_set_arg;
+ struct ldlm_lock *ca_lock;
+};
+
/* LDLM state */
static struct ldlm_state *ldlm_state;
return timeout < 1 ? 1 : timeout;
}
-#ifdef __KERNEL__
-/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
-static spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
-static struct list_head waiting_locks_list;
-static cfs_timer_t waiting_locks_timer;
-
-static struct expired_lock_thread {
- cfs_waitq_t elt_waitq;
- int elt_state;
- int elt_dump;
- struct list_head elt_expired_locks;
-} expired_lock_thread;
-#endif
-
#define ELT_STOPPED 0
#define ELT_READY 1
#define ELT_TERMINATE 2
struct ldlm_bl_pool {
- spinlock_t blp_lock;
+ cfs_spinlock_t blp_lock;
/*
* blp_prio_list is used for callbacks that should be handled
* as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
* see bug 13843
*/
- struct list_head blp_prio_list;
+ cfs_list_t blp_prio_list;
/*
* blp_list is used for all other callbacks which are likely
* to take longer to process.
*/
- struct list_head blp_list;
+ cfs_list_t blp_list;
cfs_waitq_t blp_waitq;
- struct completion blp_comp;
- atomic_t blp_num_threads;
- atomic_t blp_busy_threads;
+ cfs_completion_t blp_comp;
+ cfs_atomic_t blp_num_threads;
+ cfs_atomic_t blp_busy_threads;
int blp_min_threads;
int blp_max_threads;
};
struct ldlm_bl_work_item {
- struct list_head blwi_entry;
- struct ldlm_namespace *blwi_ns;
+ cfs_list_t blwi_entry;
+ struct ldlm_namespace *blwi_ns;
struct ldlm_lock_desc blwi_ld;
- struct ldlm_lock *blwi_lock;
- struct list_head blwi_head;
+ struct ldlm_lock *blwi_lock;
+ cfs_list_t blwi_head;
int blwi_count;
+ cfs_completion_t blwi_comp;
+ int blwi_mode;
+ int blwi_mem_pressure;
};
-#ifdef __KERNEL__
+#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__)
+
+/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
+static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
+static cfs_list_t waiting_locks_list;
+static cfs_timer_t waiting_locks_timer;
+
+static struct expired_lock_thread {
+ cfs_waitq_t elt_waitq;
+ int elt_state;
+ int elt_dump;
+ cfs_list_t elt_expired_locks;
+} expired_lock_thread;
static inline int have_expired_locks(void)
{
int need_to_run;
ENTRY;
- spin_lock_bh(&waiting_locks_spinlock);
- need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
+ need_to_run = !cfs_list_empty(&expired_lock_thread.elt_expired_locks);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
RETURN(need_to_run);
}
static int expired_lock_main(void *arg)
{
- struct list_head *expired = &expired_lock_thread.elt_expired_locks;
+ cfs_list_t *expired = &expired_lock_thread.elt_expired_locks;
struct l_wait_info lwi = { 0 };
int do_dump;
expired_lock_thread.elt_state == ELT_TERMINATE,
&lwi);
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (expired_lock_thread.elt_dump) {
- spin_unlock_bh(&waiting_locks_spinlock);
+ struct libcfs_debug_msg_data msgdata = {
+ .msg_file = __FILE__,
+ .msg_fn = "waiting_locks_callback",
+ .msg_line = expired_lock_thread.elt_dump };
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
/* from waiting_locks_callback, but not in timer */
libcfs_debug_dumplog();
- libcfs_run_lbug_upcall(__FILE__,
- "waiting_locks_callback",
- expired_lock_thread.elt_dump);
+ libcfs_run_lbug_upcall(&msgdata);
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
expired_lock_thread.elt_dump = 0;
}
do_dump = 0;
- while (!list_empty(expired)) {
+ while (!cfs_list_empty(expired)) {
struct obd_export *export;
struct ldlm_lock *lock;
- lock = list_entry(expired->next, struct ldlm_lock,
+ lock = cfs_list_entry(expired->next, struct ldlm_lock,
l_pending_chain);
if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
(void *)lock >= LP_POISON) {
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
CERROR("free lock on elt list %p\n", lock);
LBUG();
}
- list_del_init(&lock->l_pending_chain);
+ cfs_list_del_init(&lock->l_pending_chain);
if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
(void *)lock->l_export >= LP_POISON) {
CERROR("lock with free export on elt list %p\n",
LDLM_LOCK_RELEASE(lock);
continue;
}
- export = class_export_get(lock->l_export);
- spin_unlock_bh(&waiting_locks_spinlock);
+
+ if (lock->l_destroyed) {
+ /* release the lock refcount where
+ * waiting_locks_callback() founds */
+ LDLM_LOCK_RELEASE(lock);
+ continue;
+ }
+ export = class_export_lock_get(lock->l_export, lock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
+
+ do_dump++;
+ class_fail_export(export);
+ class_export_lock_put(export, lock);
/* release extra ref grabbed by ldlm_add_waiting_lock()
* or ldlm_failed_ast() */
LDLM_LOCK_RELEASE(lock);
- do_dump++;
- class_fail_export(export);
- class_export_put(export);
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
}
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
if (do_dump && obd_dump_on_eviction) {
CERROR("dump the log upon eviction\n");
}
static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds);
/**
* Check if there is a request in the export request list
if (lock->l_export == NULL)
return 0;
- spin_lock(&lock->l_export->exp_lock);
- list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ rq_exp_list) {
if (req->rq_ops->hpreq_lock_match) {
match = req->rq_ops->hpreq_lock_match(req, lock);
if (match)
break;
}
}
- spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
RETURN(match);
}
/* This is called from within a timer interrupt and cannot schedule */
static void waiting_locks_callback(unsigned long unused)
{
- struct ldlm_lock *lock, *last = NULL;
-
-repeat:
- spin_lock_bh(&waiting_locks_spinlock);
- while (!list_empty(&waiting_locks_list)) {
- lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
- l_pending_chain);
- if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) ||
+ struct ldlm_lock *lock;
+ int need_dump = 0;
+
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
+ while (!cfs_list_empty(&waiting_locks_list)) {
+ lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
+ l_pending_chain);
+ if (cfs_time_after(lock->l_callback_timeout,
+ cfs_time_current()) ||
(lock->l_req_mode == LCK_GROUP))
break;
lock->l_export->exp_connection->c_remote_uuid.uuid,
libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
- list_del_init(&lock->l_pending_chain);
- spin_unlock_bh(&waiting_locks_spinlock);
- ldlm_add_waiting_lock(lock);
- goto repeat;
+ cfs_list_del_init(&lock->l_pending_chain);
+ if (lock->l_destroyed) {
+ /* relay the lock refcount decrease to
+ * expired lock thread */
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ } else {
+ __ldlm_add_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
+ }
+ continue;
}
/* if timeout overlaps the activation time of suspended timeouts
lock->l_export->exp_connection->c_remote_uuid.uuid,
libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
- list_del_init(&lock->l_pending_chain);
- spin_unlock_bh(&waiting_locks_spinlock);
- ldlm_add_waiting_lock(lock);
- goto repeat;
+ cfs_list_del_init(&lock->l_pending_chain);
+ if (lock->l_destroyed) {
+ /* relay the lock refcount decrease to
+ * expired lock thread */
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ } else {
+ __ldlm_add_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
+ }
+ continue;
}
/* Check if we need to prolong timeout */
LDLM_LOCK_GET(lock);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "prolong the busy lock");
ldlm_refresh_waiting_lock(lock,
ldlm_get_enq_timeout(lock));
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (!cont) {
LDLM_LOCK_RELEASE(lock);
LDLM_LOCK_RELEASE(lock);
continue;
}
- lock->l_resource->lr_namespace->ns_timeouts++;
+ ldlm_lock_to_ns(lock)->ns_timeouts++;
LDLM_ERROR(lock, "lock callback timer expired after %lds: "
"evicting client at %s ",
cfs_time_current_sec()- lock->l_last_activity,
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
- last = lock;
-
/* no needs to take an extra ref on the lock since it was in
* the waiting_locks_list and ldlm_add_waiting_lock()
* already grabbed a ref */
- list_del(&lock->l_pending_chain);
- list_add(&lock->l_pending_chain,
- &expired_lock_thread.elt_expired_locks);
- }
+ cfs_list_del(&lock->l_pending_chain);
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ need_dump = 1;
+ }
- if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
- if (obd_dump_on_timeout)
- expired_lock_thread.elt_dump = __LINE__;
+ if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
+ if (obd_dump_on_timeout && need_dump)
+ expired_lock_thread.elt_dump = __LINE__;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- }
+ cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ }
/*
* Make sure the timer will fire again if we have any locks
* left.
*/
- if (!list_empty(&waiting_locks_list)) {
+ if (!cfs_list_empty(&waiting_locks_list)) {
cfs_time_t timeout_rounded;
- lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
- l_pending_chain);
+ lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
+ l_pending_chain);
timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
}
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
}
/*
cfs_time_t timeout;
cfs_time_t timeout_rounded;
- if (!list_empty(&lock->l_pending_chain))
+ if (!cfs_list_empty(&lock->l_pending_chain))
return 0;
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
}
/* if the new lock has a shorter timeout than something earlier on
the list, we'll wait the longer amount of time; no big deal. */
- list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
+ /* FIFO */
+ cfs_list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
return 1;
}
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
- int ret;
- int timeout = ldlm_get_enq_timeout(lock);
+ int ret;
+ int timeout = ldlm_get_enq_timeout(lock);
- LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+ /* NB: must be called with hold of lock_res_and_lock() */
+ LASSERT(lock->l_res_locked);
+ lock->l_waited = 1;
- spin_lock_bh(&waiting_locks_spinlock);
+ LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (lock->l_destroyed) {
static cfs_time_t next;
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
if (cfs_time_after(cfs_time_current(), next)) {
next = cfs_time_shift(14400);
}
ret = __ldlm_add_waiting_lock(lock, timeout);
- if (ret)
+ if (ret) {
/* grab ref on the lock if it has been added to the
* waiting list */
LDLM_LOCK_GET(lock);
- spin_unlock_bh(&waiting_locks_spinlock);
+ }
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
+
+ if (ret) {
+ cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ if (cfs_list_empty(&lock->l_exp_list))
+ cfs_list_add(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+ }
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
*/
static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
{
- struct list_head *list_next;
+ cfs_list_t *list_next;
- if (list_empty(&lock->l_pending_chain))
+ if (cfs_list_empty(&lock->l_pending_chain))
return 0;
list_next = lock->l_pending_chain.next;
cfs_timer_disarm(&waiting_locks_timer);
} else {
struct ldlm_lock *next;
- next = list_entry(list_next, struct ldlm_lock,
- l_pending_chain);
+ next = cfs_list_entry(list_next, struct ldlm_lock,
+ l_pending_chain);
cfs_timer_arm(&waiting_locks_timer,
round_timeout(next->l_callback_timeout));
}
}
- list_del_init(&lock->l_pending_chain);
+ cfs_list_del_init(&lock->l_pending_chain);
return 1;
}
if (lock->l_export == NULL) {
/* We don't have a "waiting locks list" on clients. */
- LDLM_DEBUG(lock, "client lock: no-op");
+ CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
return 0;
}
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
ret = __ldlm_del_waiting_lock(lock);
- spin_unlock_bh(&waiting_locks_spinlock);
- if (ret)
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
+
+ /* remove the lock out of export blocking list */
+ cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ cfs_list_del_init(&lock->l_exp_list);
+ cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+ if (ret) {
/* release lock ref if it has indeed been removed
* from a list */
LDLM_LOCK_RELEASE(lock);
+ }
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
return 0;
}
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
- if (list_empty(&lock->l_pending_chain)) {
- spin_unlock_bh(&waiting_locks_spinlock);
+ if (cfs_list_empty(&lock->l_pending_chain)) {
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "wasn't waiting");
return 0;
}
* release/take a lock reference */
__ldlm_del_waiting_lock(lock);
__ldlm_add_waiting_lock(lock, timeout);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "refreshed");
return 1;
}
-#else /* !__KERNEL__ */
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
-{
- LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
- RETURN(1);
-}
+#else /* !HAVE_SERVER_SUPPORT || !__KERNEL__ */
int ldlm_del_waiting_lock(struct ldlm_lock *lock)
{
{
RETURN(0);
}
-#endif /* __KERNEL__ */
+
+# ifdef HAVE_SERVER_SUPPORT
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+{
+ LASSERT(lock->l_res_locked);
+ LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+ RETURN(1);
+}
+
+# endif
+#endif /* HAVE_SERVER_SUPPORT && __KERNEL__ */
+
+#ifdef HAVE_SERVER_SUPPORT
static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
const char *ast_type)
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
#ifdef __KERNEL__
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (__ldlm_del_waiting_lock(lock) == 0)
/* the lock was not in any list, grab an extra ref before adding
* the lock to the expired list */
LDLM_LOCK_GET(lock);
- list_add(&lock->l_pending_chain, &expired_lock_thread.elt_expired_locks);
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
#else
class_fail_export(lock->l_export);
#endif
ldlm_failed_ast(lock, rc, ast_type);
}
} else if (rc) {
- if (rc == -EINVAL)
+ if (rc == -EINVAL) {
+ struct ldlm_resource *res = lock->l_resource;
LDLM_DEBUG(lock, "client (nid %s) returned %d"
" from %s AST - normal race",
libcfs_nid2str(peer.nid),
req->rq_repmsg ?
lustre_msg_get_status(req->rq_repmsg) : -1,
ast_type);
- else
+ if (res) {
+ /* update lvbo to return proper attributes.
+ * see bug 23174 */
+ ldlm_resource_getref(res);
+ ldlm_res_lvbo_update(res, NULL, 1);
+ ldlm_resource_putref(res);
+ }
+
+ } else {
LDLM_ERROR(lock, "client (nid %s) returned %d "
"from %s AST", libcfs_nid2str(peer.nid),
(req->rq_repmsg != NULL) ?
lustre_msg_get_status(req->rq_repmsg) : 0,
ast_type);
+ }
ldlm_lock_cancel(lock);
/* Server-side AST functions are called from ldlm_reprocess_all,
* which needs to be told to please restart its reprocessing. */
static int ldlm_cb_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
- struct ldlm_cb_set_arg *arg;
- struct ldlm_lock *lock;
+ struct ldlm_cb_async_args *ca = data;
+ struct ldlm_lock *lock = ca->ca_lock;
+ struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
ENTRY;
- LASSERT(data != NULL);
-
- arg = req->rq_async_args.pointer_arg[0];
- lock = req->rq_async_args.pointer_arg[1];
LASSERT(lock != NULL);
- if (rc != 0) {
- /* If client canceled the lock but the cancel has not
- * been recieved yet, we need to update lvbo to have the
- * proper attributes cached. */
- if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
- ldlm_res_lvbo_update(lock->l_resource, NULL,
- 0, 1);
- rc = ldlm_handle_ast_error(lock, req, rc,
- arg->type == LDLM_BL_CALLBACK
- ? "blocking" : "completion");
- }
+ switch (arg->type) {
+ case LDLM_GL_CALLBACK:
+ /* Update the LVB from disk if the AST failed
+ * (this is a legal race)
+ *
+ * - Glimpse callback of local lock just returns
+ * -ELDLM_NO_LOCK_DATA.
+ * - Glimpse callback of remote lock might return
+ * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
+ */
+ if (rc == -ELDLM_NO_LOCK_DATA) {
+ LDLM_DEBUG(lock, "lost race - client has a lock but no "
+ "inode");
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+ } else if (rc != 0) {
+ rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+ } else {
+ rc = ldlm_res_lvbo_update(lock->l_resource, req, 1);
+ }
+ break;
+ case LDLM_BL_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+ break;
+ case LDLM_CP_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+ break;
+ default:
+ LDLM_ERROR(lock, "invalid opcode for lock callback %d",
+ arg->type);
+ LBUG();
+ }
+
+ /* release extra reference taken in ldlm_ast_fini() */
LDLM_LOCK_RELEASE(lock);
- if (rc == -ERESTART)
- atomic_set(&arg->restart, 1);
+ if (rc == -ERESTART)
+ cfs_atomic_inc(&arg->restart);
RETURN(0);
}
-static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
- struct ldlm_cb_set_arg *arg,
- struct ldlm_lock *lock,
- int instant_cancel)
+static inline int ldlm_ast_fini(struct ptlrpc_request *req,
+ struct ldlm_cb_set_arg *arg,
+ struct ldlm_lock *lock,
+ int instant_cancel)
{
- int rc = 0;
- ENTRY;
-
- if (unlikely(instant_cancel)) {
- rc = ptl_send_rpc(req, 1);
- ptlrpc_req_finished(req);
- if (rc == 0)
- /* If we cancelled the lock, we need to restart
- * ldlm_reprocess_queue */
- atomic_set(&arg->restart, 1);
- } else {
- LDLM_LOCK_GET(lock);
- ptlrpc_set_add_req(arg->set, req);
- }
-
- RETURN(rc);
+ int rc = 0;
+ ENTRY;
+
+ if (unlikely(instant_cancel)) {
+ rc = ptl_send_rpc(req, 1);
+ ptlrpc_req_finished(req);
+ if (rc == 0)
+ cfs_atomic_inc(&arg->restart);
+ } else {
+ LDLM_LOCK_GET(lock);
+ ptlrpc_set_add_req(arg->set, req);
+ }
+
+ RETURN(rc);
}
/**
RETURN_EXIT;
}
- spin_lock(&lock->l_export->exp_lock);
- list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
- if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+ cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ rq_exp_list) {
+ /* Do not process requests that were not yet added to there
+ * incoming queue or were already removed from there for
+ * processing */
+ if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
+ req->rq_ops->hpreq_lock_match &&
req->rq_ops->hpreq_lock_match(req, lock))
ptlrpc_hpreq_reorder(req);
}
- spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
EXIT;
}
struct ldlm_lock_desc *desc,
void *data, int flag)
{
+ struct ldlm_cb_async_args *ca;
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
LASSERT(lock);
LASSERT(data != NULL);
- if (lock->l_export->exp_obd->obd_recovering != 0) {
+ if (lock->l_export->exp_obd->obd_recovering != 0)
LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
- ldlm_lock_dump(D_ERROR, lock, 0);
- }
ldlm_lock_reorder_req(lock);
if (req == NULL)
RETURN(-ENOMEM);
- req->rq_async_args.pointer_arg[0] = arg;
- req->rq_async_args.pointer_arg[1] = lock;
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
req->rq_interpret_reply = ldlm_cb_interpret;
req->rq_no_resend = 1;
- lock_res(lock->l_resource);
- if (lock->l_granted_mode != lock->l_req_mode) {
- /* this blocking AST will be communicated as part of the
- * completion AST instead */
- unlock_res(lock->l_resource);
- ptlrpc_req_finished(req);
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
- RETURN(0);
- }
+ lock_res_and_lock(lock);
+ if (lock->l_granted_mode != lock->l_req_mode) {
+ /* this blocking AST will be communicated as part of the
+ * completion AST instead */
+ unlock_res_and_lock(lock);
- if (lock->l_destroyed) {
- /* What's the point? */
- unlock_res(lock->l_resource);
- ptlrpc_req_finished(req);
- RETURN(0);
- }
+ ptlrpc_req_finished(req);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
+ RETURN(0);
+ }
+
+ if (lock->l_destroyed) {
+ /* What's the point? */
+ unlock_res_and_lock(lock);
+ ptlrpc_req_finished(req);
+ RETURN(0);
+ }
if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
instant_cancel = 1;
LDLM_DEBUG(lock, "server preparing blocking AST");
ptlrpc_request_set_replen(req);
- if (instant_cancel) {
- unlock_res(lock->l_resource);
- ldlm_lock_cancel(lock);
- } else {
- LASSERT(lock->l_granted_mode == lock->l_req_mode);
- ldlm_add_waiting_lock(lock);
- unlock_res(lock->l_resource);
- }
+ if (instant_cancel) {
+ unlock_res_and_lock(lock);
+ ldlm_lock_cancel(lock);
+ } else {
+ LASSERT(lock->l_granted_mode == lock->l_req_mode);
+ ldlm_add_waiting_lock(lock);
+ unlock_res_and_lock(lock);
+ }
req->rq_send_state = LUSTRE_IMP_FULL;
- /* ptlrpc_prep_req already set timeout */
+ /* ptlrpc_request_alloc_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
+ struct ldlm_cb_async_args *ca;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
if (req == NULL)
RETURN(-ENOMEM);
- lock_res_and_lock(lock);
- if (lock->l_resource->lr_lvb_len)
+ /* server namespace, doesn't need lock */
+ if (lock->l_resource->lr_lvb_len) {
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
lock->l_resource->lr_lvb_len);
- unlock_res_and_lock(lock);
+ }
rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
if (rc) {
RETURN(rc);
}
- req->rq_async_args.pointer_arg[0] = arg;
- req->rq_async_args.pointer_arg[1] = lock;
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
req->rq_interpret_reply = ldlm_cb_interpret;
req->rq_no_resend = 1;
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (lock->l_resource->lr_lvb_len) {
void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
- lock_res_and_lock(lock);
+ lock_res(lock->l_resource);
memcpy(lvb, lock->l_resource->lr_lvb_data,
lock->l_resource->lr_lvb_len);
- unlock_res_and_lock(lock);
+ unlock_res(lock->l_resource);
}
LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
- at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
- total_enqueue_wait);
+ at_measured(ldlm_lock_to_ns_at(lock),
+ total_enqueue_wait);
else
/* bz18618. Don't add lock enqueue time we spend waiting for a
previous callback to fail. Locks waiting legitimately will
LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
"It is likely that a previous callback timed out.",
total_enqueue_wait,
- at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
+ at_get(ldlm_lock_to_ns_at(lock)));
ptlrpc_request_set_replen(req);
req->rq_send_state = LUSTRE_IMP_FULL;
- /* ptlrpc_prep_req already set timeout */
+ /* ptlrpc_request_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
{
- struct ldlm_resource *res = lock->l_resource;
- struct ldlm_request *body;
- struct ptlrpc_request *req;
- int rc;
+ struct ldlm_cb_set_arg *arg = data;
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ struct ldlm_cb_async_args *ca;
+ int rc;
ENTRY;
LASSERT(lock != NULL);
body->lock_handle[0] = lock->l_remote_handle;
ldlm_lock2desc(lock, &body->lock_desc);
- lock_res_and_lock(lock);
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
+ /* server namespace, doesn't need lock */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
lock->l_resource->lr_lvb_len);
- unlock_res_and_lock(lock);
- res = lock->l_resource;
ptlrpc_request_set_replen(req);
-
req->rq_send_state = LUSTRE_IMP_FULL;
- /* ptlrpc_prep_req already set timeout */
+ /* ptlrpc_request_alloc_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
+ req->rq_interpret_reply = ldlm_cb_interpret;
+
if (lock->l_export && lock->l_export->exp_nid_stats &&
lock->l_export->exp_nid_stats->nid_ldlm_stats)
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
- rc = ptlrpc_queue_wait(req);
- if (rc == -ELDLM_NO_LOCK_DATA)
- LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
- else if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
- else
- rc = ldlm_res_lvbo_update(res, req->rq_repmsg,
- REPLY_REC_OFF, 1);
- ptlrpc_req_finished(req);
- if (rc == -ERESTART)
- ldlm_reprocess_all(res);
+ rc = ldlm_ast_fini(req, arg, lock, 0);
- RETURN(rc);
+ RETURN(rc);
}
-#ifdef __KERNEL__
-extern unsigned long long lu_time_stamp_get(void);
-#else
-#define lu_time_stamp_get() time(NULL)
-#endif
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
+{
+ int rc;
+ ENTRY;
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
+ LDLM_WORK_GL_AST);
+ if (rc == -ERESTART)
+ ldlm_reprocess_all(res);
+
+ RETURN(rc);
+}
static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
struct lprocfs_stats *srv_stats)
LASSERT(req->rq_export);
- if (req->rq_rqbd->rqbd_service->srv_stats)
- ldlm_svc_get_eopc(dlm_req,
- req->rq_rqbd->rqbd_service->srv_stats);
+ if (ptlrpc_req2svc(req)->srv_stats != NULL)
+ ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
if (req->rq_export && req->rq_export->exp_nid_stats &&
req->rq_export->exp_nid_stats->nid_ldlm_stats)
if (unlikely(flags & LDLM_FL_REPLAY)) {
/* Find an existing lock in the per-export lock hash */
- lock = lustre_hash_lookup(req->rq_export->exp_lock_hash,
- (void *)&dlm_req->lock_handle[0]);
+ lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
+ (void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
LPX64, lock->l_handle.h_cookie);
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
- /* Don't enqueue a lock onto the export if it has already
- * been evicted. Cancel it now instead. (bug 3822) */
- if (req->rq_export->exp_failed) {
- LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
+ /* Don't enqueue a lock onto the export if it is been disonnected
+ * due to eviction (bug 3822) or server umount (bug 24324).
+ * Cancel it now instead. */
+ if (req->rq_export->exp_disconnected) {
+ LDLM_ERROR(lock, "lock on disconnected export %p",
+ req->rq_export);
GOTO(out, rc = -ENOTCONN);
}
- lock->l_export = class_export_get(req->rq_export);
- atomic_inc(&lock->l_export->exp_locks_count);
+
+ lock->l_export = class_export_lock_get(req->rq_export, lock);
if (lock->l_export->exp_lock_hash)
- lustre_hash_add(lock->l_export->exp_lock_hash,
- &lock->l_remote_handle,
- &lock->l_exp_hash);
+ cfs_hash_add(lock->l_export->exp_lock_hash,
+ &lock->l_remote_handle,
+ &lock->l_exp_hash);
existing_lock:
* local_lock_enqueue by the policy function. */
cookie = req;
} else {
- lock_res_and_lock(lock);
+ /* based on the assumption that lvb size never changes during
+ * resource life time otherwise it need resource->lr_lock's
+ * protection */
if (lock->l_resource->lr_lvb_len) {
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER,
lock->l_resource->lr_lvb_len);
}
- unlock_res_and_lock(lock);
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
}
if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
- lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+ ldlm_convert_policy_to_local(req->rq_export,
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
lock->l_req_extent = lock->l_policy_data.l_extent;
dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
- /* Don't move a pending lock onto the export if it has already
- * been evicted. Cancel it now instead. (bug 5683) */
- if (unlikely(req->rq_export->exp_failed ||
+ /* Don't move a pending lock onto the export if it has already been
+ * disconnected due to eviction (bug 5683) or server umount (bug 24324).
+ * Cancel it now instead. */
+ if (unlikely(req->rq_export->exp_disconnected ||
OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
rc = -ENOTCONN;
if (lock->l_granted_mode == lock->l_req_mode) {
/*
* Only cancel lock if it was granted, because it would
- * be destroyed immediatelly and would never be granted
+ * be destroyed immediately and would never be granted
* in the future, causing timeouts on client. Not
- * granted lock will be cancelled immediatelly after
+ * granted lock will be cancelled immediately after
* sending completion AST.
*/
if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
"(err=%d, rc=%d)", err, rc);
- lock_res_and_lock(lock);
if (rc == 0) {
if (lock->l_resource->lr_lvb_len > 0) {
+ /* MDT path won't handle lr_lvb_data, so
+ * lock/unlock better be contained in the
+ * if block */
void *lvb;
lvb = req_capsule_server_get(&req->rq_pill,
&RMF_DLM_LVB);
LASSERTF(lvb != NULL, "req %p, lock %p\n",
req, lock);
-
+ lock_res(lock->l_resource);
memcpy(lvb, lock->l_resource->lr_lvb_data,
lock->l_resource->lr_lvb_len);
+ unlock_res(lock->l_resource);
}
} else {
+ lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy_nolock(lock);
+ unlock_res_and_lock(lock);
}
- unlock_res_and_lock(lock);
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
if (res != NULL) {
ldlm_resource_getref(res);
LDLM_RESOURCE_ADDREF(res);
- ldlm_res_lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 1);
}
pres = res;
}
if (!ldlm_request_cancel(req, dlm_req, 0))
req->rq_status = ESTALE;
- if (ptlrpc_reply(req) != 0)
- LBUG();
-
- RETURN(0);
+ RETURN(ptlrpc_reply(req));
}
+#endif /* HAVE_SERVER_SUPPORT */
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
int do_ast;
ENTRY;
- LDLM_DEBUG(lock, "client blocking AST callback handler START");
+ LDLM_DEBUG(lock, "client blocking AST callback handler");
lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_CBPENDING;
unlock_res_and_lock(lock);
if (do_ast) {
- LDLM_DEBUG(lock, "already unused, calling "
- "callback (%p)", lock->l_blocking_ast);
+ CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
+ lock, lock->l_blocking_ast);
if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, ld, lock->l_ast_data,
LDLM_CB_BLOCKING);
} else {
- LDLM_DEBUG(lock, "Lock still has references, will be"
- " cancelled later");
+ CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
+ lock);
}
LDLM_DEBUG(lock, "client blocking callback handler END");
struct ldlm_request *dlm_req,
struct ldlm_lock *lock)
{
+ int lvb_len;
CFS_LIST_HEAD(ast_list);
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
int to = cfs_time_seconds(1);
while (to > 0) {
- cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, to);
+ cfs_schedule_timeout_and_set_state(
+ CFS_TASK_INTERRUPTIBLE, to);
if (lock->l_granted_mode == lock->l_req_mode ||
lock->l_destroyed)
break;
}
}
+ lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
+ if (lvb_len > 0) {
+ if (lock->l_lvb_len > 0) {
+ /* for extent lock, lvb contains ost_lvb{}. */
+ LASSERT(lock->l_lvb_data != NULL);
+ LASSERTF(lock->l_lvb_len == lvb_len,
+ "preallocated %d, actual %d.\n",
+ lock->l_lvb_len, lvb_len);
+ } else { /* for layout lock, lvb has variable length */
+ void *lvb_data;
+
+ OBD_ALLOC(lvb_data, lvb_len);
+ if (lvb_data == NULL)
+ LDLM_ERROR(lock, "no memory.\n");
+
+ lock_res_and_lock(lock);
+ if (lvb_data == NULL) {
+ lock->l_flags |= LDLM_FL_FAILED;
+ } else {
+ LASSERT(lock->l_lvb_data == NULL);
+ lock->l_lvb_data = lvb_data;
+ lock->l_lvb_len = lvb_len;
+ }
+ unlock_res_and_lock(lock);
+ }
+ }
+
lock_res_and_lock(lock);
if (lock->l_destroyed ||
lock->l_granted_mode == lock->l_req_mode) {
}
if (lock->l_resource->lr_type != LDLM_PLAIN) {
- lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+ ldlm_convert_policy_to_local(req->rq_export,
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
LDLM_DEBUG(lock, "completion AST, new policy data");
}
LDLM_ERROR(lock, "completion AST did not contain "
"expected LVB!");
} else {
- void *lvb = req_capsule_client_swab_get(&req->rq_pill,
- &RMF_DLM_LVB,
- (void *)lock->l_lvb_swabber);
+ void *lvb = req_capsule_client_get(&req->rq_pill,
+ &RMF_DLM_LVB);
memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
}
}
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
- ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
+ /* Let Enqueue to call osc_lock_upcall() and initialize
+ * l_ast_data */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
+
+ ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
lock);
}
#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- struct list_head *cancels, int count)
+static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- struct ldlm_bl_work_item *blwi;
ENTRY;
- if (cancels && count == 0)
- RETURN(0);
+ cfs_spin_lock(&blp->blp_lock);
+ if (blwi->blwi_lock && blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
+ /* add LDLM_FL_DISCARD_DATA requests to the priority list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+ } else {
+ /* other blocking callbacks are added to the regular list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+ }
+ cfs_spin_unlock(&blp->blp_lock);
- OBD_ALLOC(blwi, sizeof(*blwi));
- if (blwi == NULL)
- RETURN(-ENOMEM);
+ cfs_waitq_signal(&blp->blp_waitq);
+
+ /* can not use blwi->blwi_mode as blwi could be already freed in
+ LDLM_ASYNC mode */
+ if (mode == LDLM_SYNC)
+ cfs_wait_for_completion(&blwi->blwi_comp);
+
+ RETURN(0);
+}
+
+static inline void init_blwi(struct ldlm_bl_work_item *blwi,
+ struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld,
+ cfs_list_t *cancels, int count,
+ struct ldlm_lock *lock,
+ int mode)
+{
+ cfs_init_completion(&blwi->blwi_comp);
+ CFS_INIT_LIST_HEAD(&blwi->blwi_head);
+
+ if (cfs_memory_pressure_get())
+ blwi->blwi_mem_pressure = 1;
blwi->blwi_ns = ns;
+ blwi->blwi_mode = mode;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
- list_add(&blwi->blwi_head, cancels);
- list_del_init(cancels);
+ cfs_list_add(&blwi->blwi_head, cancels);
+ cfs_list_del_init(cancels);
blwi->blwi_count = count;
} else {
blwi->blwi_lock = lock;
}
- spin_lock(&blp->blp_lock);
- if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
- /* add LDLM_FL_DISCARD_DATA requests to the priority list */
- list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+}
+
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+ cfs_list_t *cancels, int count, int mode)
+{
+ ENTRY;
+
+ if (cancels && count == 0)
+ RETURN(0);
+
+ if (mode == LDLM_SYNC) {
+ /* if it is synchronous call do minimum mem alloc, as it could
+ * be triggered from kernel shrinker
+ */
+ struct ldlm_bl_work_item blwi;
+ memset(&blwi, 0, sizeof(blwi));
+ init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
+ RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
} else {
- /* other blocking callbacks are added to the regular list */
- list_add_tail(&blwi->blwi_entry, &blp->blp_list);
- }
- cfs_waitq_signal(&blp->blp_waitq);
- spin_unlock(&blp->blp_lock);
+ struct ldlm_bl_work_item *blwi;
+ OBD_ALLOC(blwi, sizeof(*blwi));
+ if (blwi == NULL)
+ RETURN(-ENOMEM);
+ init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
- RETURN(0);
+ RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
+ }
}
+
#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock *lock)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+ RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
#else
RETURN(-ENOSYS);
#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct list_head *cancels, int count)
+ cfs_list_t *cancels, int count, int mode)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
+ RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
#else
RETURN(-ENOSYS);
#endif
}
+/* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
+static int ldlm_handle_setinfo(struct ptlrpc_request *req)
+{
+ struct obd_device *obd = req->rq_export->exp_obd;
+ char *key;
+ void *val;
+ int keylen, vallen;
+ int rc = -ENOSYS;
+ ENTRY;
+
+ DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
+
+ req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
+
+ key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+ if (key == NULL) {
+ DEBUG_REQ(D_IOCTL, req, "no set_info key");
+ RETURN(-EFAULT);
+ }
+ keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
+ RCL_CLIENT);
+ val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
+ if (val == NULL) {
+ DEBUG_REQ(D_IOCTL, req, "no set_info val");
+ RETURN(-EFAULT);
+ }
+ vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT);
+
+ /* We are responsible for swabbing contents of val */
+
+ if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
+ /* Pass it on to mdc (the "export" in this case) */
+ rc = obd_set_info_async(req->rq_svc_thread->t_env,
+ req->rq_export,
+ sizeof(KEY_HSM_COPYTOOL_SEND),
+ KEY_HSM_COPYTOOL_SEND,
+ vallen, val, NULL);
+ else
+ DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
+
+ return rc;
+}
+
+static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
+ const char *msg, int rc,
+ struct lustre_handle *handle)
+{
+ DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
+ "%s: [nid %s] [rc %d] [lock "LPX64"]",
+ msg, libcfs_id2str(req->rq_peer), rc,
+ handle ? handle->cookie : 0);
+ if (req->rq_no_reply)
+ CWARN("No reply was sent, maybe cause bug 21636.\n");
+ else if (rc)
+ CWARN("Send reply failed, maybe cause bug 21636.\n");
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- ldlm_callback_reply(req, -ENOTCONN);
+ rc = ldlm_callback_reply(req, -ENOTCONN);
+ ldlm_callback_errmsg(req, "Operate on unconnected server",
+ rc, NULL);
RETURN(0);
}
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK))
RETURN(0);
break;
+ case LDLM_SET_INFO:
+ rc = ldlm_handle_setinfo(req);
+ ldlm_callback_reply(req, rc);
+ RETURN(0);
case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
RETURN(0);
- rc = llog_origin_handle_create(req);
+ rc = llog_origin_handle_open(req);
ldlm_callback_reply(req, rc);
RETURN(0);
case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (dlm_req == NULL) {
- ldlm_callback_reply(req, -EPROTO);
+ rc = ldlm_callback_reply(req, -EPROTO);
+ ldlm_callback_errmsg(req, "Operate without parameter", rc,
+ NULL);
RETURN(0);
}
if (!lock) {
CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
"disappeared\n", dlm_req->lock_handle[0].cookie);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
lock_res_and_lock(lock);
lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- /* If somebody cancels lock and cache is already droped,
+ /* If somebody cancels lock and cache is already dropped,
* or lock is failed before cp_ast received on client,
* we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache. */
dlm_req->lock_handle[0].cookie);
unlock_res_and_lock(lock);
LDLM_LOCK_RELEASE(lock);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
/* BL_AST locks are not needed in lru.
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
- if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
- ldlm_callback_reply(req, 0);
+ if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
+ rc = ldlm_callback_reply(req, 0);
+ if (req->rq_no_reply || rc)
+ ldlm_callback_errmsg(req, "Normal process", rc,
+ &dlm_req->lock_handle[0]);
+ }
if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
RETURN(0);
}
+#ifdef HAVE_SERVER_SUPPORT
static int ldlm_cancel_handler(struct ptlrpc_request *req)
{
int rc;
if (req->rq_export == NULL) {
struct ldlm_request *dlm_req;
- CERROR("operation %d from %s with bad export cookie "LPU64"\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer),
+ CERROR("%s from %s arrived at %lu with bad export cookie "
+ LPU64"\n",
+ ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
+ libcfs_nid2str(req->rq_peer.nid),
+ req->rq_arrival_time.tv_sec,
lustre_msg_get_handle(req->rq_reqmsg)->cookie);
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL)
- ldlm_lock_dump_handle(D_ERROR,
- &dlm_req->lock_handle[0]);
+ if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+ dlm_req = req_capsule_client_get(&req->rq_pill,
+ &RMF_DLM_REQ);
+ if (dlm_req != NULL)
+ ldlm_lock_dump_handle(D_ERROR,
+ &dlm_req->lock_handle[0]);
+ }
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
RETURN(0);
}
-void ldlm_revoke_lock_cb(void *obj, void *data)
+static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct ldlm_request *dlm_req;
+ struct lustre_handle lockh;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ if (dlm_req == NULL)
+ RETURN(0);
+
+ ldlm_lock2handle(lock, &lockh);
+ for (i = 0; i < dlm_req->lock_count; i++) {
+ if (lustre_handle_equal(&dlm_req->lock_handle[i],
+ &lockh)) {
+ DEBUG_REQ(D_RPCTRACE, req,
+ "Prio raised by lock "LPX64".", lockh.cookie);
+
+ rc = 1;
+ break;
+ }
+ }
+
+ RETURN(rc);
+
+}
+
+static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
+{
+ struct ldlm_request *dlm_req;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ /* no prolong in recovery */
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ RETURN(0);
+
+ dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ if (dlm_req == NULL)
+ RETURN(-EFAULT);
+
+ for (i = 0; i < dlm_req->lock_count; i++) {
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
+ if (lock == NULL)
+ continue;
+
+ rc = !!(lock->l_flags & LDLM_FL_AST_SENT);
+ if (rc)
+ LDLM_DEBUG(lock, "hpreq cancel lock");
+ LDLM_LOCK_PUT(lock);
+
+ if (rc)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
+ .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
+ .hpreq_check = ldlm_cancel_hpreq_check,
+ .hpreq_fini = NULL,
+};
+
+static int ldlm_hpreq_handler(struct ptlrpc_request *req)
+{
+ ENTRY;
+
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+
+ if (req->rq_export == NULL)
+ RETURN(0);
+
+ if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+ req->rq_ops = &ldlm_cancel_hpreq_ops;
+ }
+ RETURN(0);
+}
+
+int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+ cfs_hlist_node_t *hnode, void *data)
+
{
- struct list_head *rpc_list = data;
- struct ldlm_lock *lock = obj;
+ cfs_list_t *rpc_list = data;
+ struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
lock_res_and_lock(lock);
if (lock->l_req_mode != lock->l_granted_mode) {
unlock_res_and_lock(lock);
- return;
+ return 0;
}
LASSERT(lock->l_resource);
if (lock->l_resource->lr_type != LDLM_IBITS &&
lock->l_resource->lr_type != LDLM_PLAIN) {
unlock_res_and_lock(lock);
- return;
+ return 0;
}
if (lock->l_flags & LDLM_FL_AST_SENT) {
unlock_res_and_lock(lock);
- return;
+ return 0;
}
LASSERT(lock->l_blocking_ast);
LASSERT(!lock->l_blocking_lock);
lock->l_flags |= LDLM_FL_AST_SENT;
- if (lock->l_export && lock->l_export->exp_lock_hash &&
- !hlist_unhashed(&lock->l_exp_hash))
- lustre_hash_del(lock->l_export->exp_lock_hash,
- &lock->l_remote_handle, &lock->l_exp_hash);
- list_add_tail(&lock->l_rk_ast, rpc_list);
+ if (lock->l_export && lock->l_export->exp_lock_hash) {
+ /* NB: it's safe to call cfs_hash_del() even lock isn't
+ * in exp_lock_hash. */
+ cfs_hash_del(lock->l_export->exp_lock_hash,
+ &lock->l_remote_handle, &lock->l_exp_hash);
+ }
+
+ cfs_list_add_tail(&lock->l_rk_ast, rpc_list);
LDLM_LOCK_GET(lock);
unlock_res_and_lock(lock);
+ return 0;
}
void ldlm_revoke_export_locks(struct obd_export *exp)
{
- struct list_head rpc_list;
+ cfs_list_t rpc_list;
ENTRY;
CFS_INIT_LIST_HEAD(&rpc_list);
- lustre_hash_for_each_empty(exp->exp_lock_hash,
- ldlm_revoke_lock_cb, &rpc_list);
- ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
+ cfs_hash_for_each_empty(exp->exp_lock_hash,
+ ldlm_revoke_lock_cb, &rpc_list);
+ ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+ LDLM_WORK_REVOKE_AST);
EXIT;
}
+#endif /* HAVE_SERVER_SUPPORT */
#ifdef __KERNEL__
static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
struct ldlm_bl_work_item *blwi = NULL;
static unsigned int num_bl = 0;
- spin_lock(&blp->blp_lock);
+ cfs_spin_lock(&blp->blp_lock);
/* process a request from the blp_list at least every blp_num_threads */
- if (!list_empty(&blp->blp_list) &&
- (list_empty(&blp->blp_prio_list) || num_bl == 0))
- blwi = list_entry(blp->blp_list.next,
- struct ldlm_bl_work_item, blwi_entry);
+ if (!cfs_list_empty(&blp->blp_list) &&
+ (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0))
+ blwi = cfs_list_entry(blp->blp_list.next,
+ struct ldlm_bl_work_item, blwi_entry);
else
- if (!list_empty(&blp->blp_prio_list))
- blwi = list_entry(blp->blp_prio_list.next,
- struct ldlm_bl_work_item, blwi_entry);
+ if (!cfs_list_empty(&blp->blp_prio_list))
+ blwi = cfs_list_entry(blp->blp_prio_list.next,
+ struct ldlm_bl_work_item,
+ blwi_entry);
if (blwi) {
- if (++num_bl >= atomic_read(&blp->blp_num_threads))
+ if (++num_bl >= cfs_atomic_read(&blp->blp_num_threads))
num_bl = 0;
- list_del(&blwi->blwi_entry);
+ cfs_list_del(&blwi->blwi_entry);
}
- spin_unlock(&blp->blp_lock);
+ cfs_spin_unlock(&blp->blp_lock);
return blwi;
}
struct ldlm_bl_thread_data {
char bltd_name[CFS_CURPROC_COMM_MAX];
struct ldlm_bl_pool *bltd_blp;
- struct completion bltd_comp;
+ cfs_completion_t bltd_comp;
int bltd_num;
};
struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
int rc;
- init_completion(&bltd.bltd_comp);
- rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+ cfs_init_completion(&bltd.bltd_comp);
+ rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
if (rc < 0) {
CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
- atomic_read(&blp->blp_num_threads), rc);
+ cfs_atomic_read(&blp->blp_num_threads), rc);
return rc;
}
- wait_for_completion(&bltd.bltd_comp);
+ cfs_wait_for_completion(&bltd.bltd_comp);
return 0;
}
blp = bltd->bltd_blp;
- bltd->bltd_num = atomic_inc_return(&blp->blp_num_threads) - 1;
- atomic_inc(&blp->blp_busy_threads);
+ bltd->bltd_num =
+ cfs_atomic_inc_return(&blp->blp_num_threads) - 1;
+ cfs_atomic_inc(&blp->blp_busy_threads);
snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
"ldlm_bl_%02d", bltd->bltd_num);
cfs_daemonize(bltd->bltd_name);
- complete(&bltd->bltd_comp);
+ cfs_complete(&bltd->bltd_comp);
/* cannot use bltd after this, it is only on caller's stack */
}
while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
+ int busy;
blwi = ldlm_bl_get_work(blp);
if (blwi == NULL) {
- int busy;
-
- atomic_dec(&blp->blp_busy_threads);
+ cfs_atomic_dec(&blp->blp_busy_threads);
l_wait_event_exclusive(blp->blp_waitq,
(blwi = ldlm_bl_get_work(blp)) != NULL,
&lwi);
- busy = atomic_inc_return(&blp->blp_busy_threads);
-
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
-
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= atomic_read(&blp->blp_num_threads)))
- /* discard the return value, we tried */
- ldlm_bl_thread_start(blp);
+ busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
} else {
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
+ busy = cfs_atomic_read(&blp->blp_busy_threads);
}
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+
+ /* Not fatal if racy and have a few too many threads */
+ if (unlikely(busy < blp->blp_max_threads &&
+ busy >= cfs_atomic_read(&blp->blp_num_threads) &&
+ !blwi->blwi_mem_pressure))
+ /* discard the return value, we tried */
+ ldlm_bl_thread_start(blp);
+
+ if (blwi->blwi_mem_pressure)
+ cfs_memory_pressure_set();
+
if (blwi->blwi_count) {
+ int count;
/* The special case when we cancel locks in lru
* asynchronously, we pass the list of locks here.
- * Thus lock is marked LDLM_FL_CANCELING, and already
- * canceled locally. */
- ldlm_cli_cancel_list(&blwi->blwi_head,
- blwi->blwi_count, NULL, 0);
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count,
+ LCF_BL_AST);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
}
- OBD_FREE(blwi, sizeof(*blwi));
+ if (blwi->blwi_mem_pressure)
+ cfs_memory_pressure_clr();
+
+ if (blwi->blwi_mode == LDLM_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ cfs_complete(&blwi->blwi_comp);
}
- atomic_dec(&blp->blp_busy_threads);
- atomic_dec(&blp->blp_num_threads);
- complete(&blp->blp_comp);
+ cfs_atomic_dec(&blp->blp_busy_threads);
+ cfs_atomic_dec(&blp->blp_num_threads);
+ cfs_complete(&blp->blp_comp);
RETURN(0);
}
{
int rc = 0;
ENTRY;
- mutex_down(&ldlm_ref_sem);
+ cfs_mutex_lock(&ldlm_ref_mutex);
if (++ldlm_refcount == 1) {
rc = ldlm_setup();
if (rc)
ldlm_refcount--;
}
- mutex_up(&ldlm_ref_sem);
+ cfs_mutex_unlock(&ldlm_ref_mutex);
RETURN(rc);
}
void ldlm_put_ref(void)
{
ENTRY;
- mutex_down(&ldlm_ref_sem);
+ cfs_mutex_lock(&ldlm_ref_mutex);
if (ldlm_refcount == 1) {
int rc = ldlm_cleanup();
if (rc)
} else {
ldlm_refcount--;
}
- mutex_up(&ldlm_ref_sem);
+ cfs_mutex_unlock(&ldlm_ref_mutex);
EXIT;
}
* Export handle<->lock hash operations.
*/
static unsigned
-ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask)
+ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
{
- return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask);
+ return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
}
static void *
-ldlm_export_lock_key(struct hlist_node *hnode)
+ldlm_export_lock_key(cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
- ENTRY;
- lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
- RETURN(&lock->l_remote_handle);
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ return &lock->l_remote_handle;
+}
+
+static void
+ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key)
+{
+ struct ldlm_lock *lock;
+
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock->l_remote_handle = *(struct lustre_handle *)key;
}
static int
-ldlm_export_lock_compare(void *key, struct hlist_node *hnode)
+ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode)
{
- ENTRY;
- RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key));
+ return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
}
static void *
-ldlm_export_lock_get(struct hlist_node *hnode)
+ldlm_export_lock_object(cfs_hlist_node_t *hnode)
+{
+ return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+}
+
+static void
+ldlm_export_lock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
- ENTRY;
- lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_GET(lock);
-
- RETURN(lock);
}
-static void *
-ldlm_export_lock_put(struct hlist_node *hnode)
+static void
+ldlm_export_lock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
- ENTRY;
- lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_RELEASE(lock);
-
- RETURN(lock);
}
-static lustre_hash_ops_t ldlm_export_lock_ops = {
- .lh_hash = ldlm_export_lock_hash,
- .lh_key = ldlm_export_lock_key,
- .lh_compare = ldlm_export_lock_compare,
- .lh_get = ldlm_export_lock_get,
- .lh_put = ldlm_export_lock_put
+static cfs_hash_ops_t ldlm_export_lock_ops = {
+ .hs_hash = ldlm_export_lock_hash,
+ .hs_key = ldlm_export_lock_key,
+ .hs_keycmp = ldlm_export_lock_keycmp,
+ .hs_keycpy = ldlm_export_lock_keycpy,
+ .hs_object = ldlm_export_lock_object,
+ .hs_get = ldlm_export_lock_get,
+ .hs_put = ldlm_export_lock_put,
+ .hs_put_locked = ldlm_export_lock_put,
};
int ldlm_init_export(struct obd_export *exp)
ENTRY;
exp->exp_lock_hash =
- lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
- 7, 16, &ldlm_export_lock_ops, LH_REHASH);
+ cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
+ HASH_EXP_LOCK_CUR_BITS,
+ HASH_EXP_LOCK_MAX_BITS,
+ HASH_EXP_LOCK_BKT_BITS, 0,
+ CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
+ &ldlm_export_lock_ops,
+ CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
+ CFS_HASH_NBLK_CHANGE);
if (!exp->exp_lock_hash)
RETURN(-ENOMEM);
void ldlm_destroy_export(struct obd_export *exp)
{
ENTRY;
- lustre_hash_exit(exp->exp_lock_hash);
+ cfs_hash_putref(exp->exp_lock_hash);
exp->exp_lock_hash = NULL;
+
+ ldlm_destroy_flock_export(exp);
EXIT;
}
EXPORT_SYMBOL(ldlm_destroy_export);
static int ldlm_setup(void)
{
- struct ldlm_bl_pool *blp;
+ static struct ptlrpc_service_conf conf;
+ struct ldlm_bl_pool *blp = NULL;
int rc = 0;
- int ldlm_min_threads = LDLM_THREADS_AUTO_MIN;
- int ldlm_max_threads = LDLM_THREADS_AUTO_MAX;
#ifdef __KERNEL__
int i;
#endif
#ifdef LPROCFS
rc = ldlm_proc_setup();
if (rc != 0)
- GOTO(out_free, rc);
+ GOTO(out, rc);
#endif
-#ifdef __KERNEL__
- if (ldlm_num_threads) {
- /* If ldlm_num_threads is set, it is the min and the max. */
- if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX)
- ldlm_num_threads = LDLM_THREADS_AUTO_MAX;
- if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN)
- ldlm_num_threads = LDLM_THREADS_AUTO_MIN;
- ldlm_min_threads = ldlm_max_threads = ldlm_num_threads;
- }
+ memset(&conf, 0, sizeof(conf));
+ conf = (typeof(conf)) {
+ .psc_name = "ldlm_cbd",
+ .psc_watchdog_factor = 2,
+ .psc_buf = {
+ .bc_nbufs = LDLM_NBUFS,
+ .bc_buf_size = LDLM_BUFSIZE,
+ .bc_req_max_size = LDLM_MAXREQSIZE,
+ .bc_rep_max_size = LDLM_MAXREPSIZE,
+ .bc_req_portal = LDLM_CB_REQUEST_PORTAL,
+ .bc_rep_portal = LDLM_CB_REPLY_PORTAL,
+ },
+ .psc_thr = {
+ .tc_thr_name = "ldlm_cb",
+ .tc_thr_factor = LDLM_THR_FACTOR,
+ .tc_nthrs_init = LDLM_NTHRS_INIT,
+ .tc_nthrs_base = LDLM_NTHRS_BASE,
+ .tc_nthrs_max = LDLM_NTHRS_MAX,
+ .tc_nthrs_user = ldlm_num_threads,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
+ },
+ .psc_cpt = {
+ .cc_pattern = ldlm_cpts,
+ },
+ .psc_ops = {
+ .so_req_handler = ldlm_callback_handler,
+ },
+ };
+ ldlm_state->ldlm_cb_service = \
+ ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
+ if (IS_ERR(ldlm_state->ldlm_cb_service)) {
+ CERROR("failed to start service\n");
+ rc = PTR_ERR(ldlm_state->ldlm_cb_service);
+ ldlm_state->ldlm_cb_service = NULL;
+ GOTO(out, rc);
+ }
+
+#ifdef HAVE_SERVER_SUPPORT
+ memset(&conf, 0, sizeof(conf));
+ conf = (typeof(conf)) {
+ .psc_name = "ldlm_canceld",
+ .psc_watchdog_factor = 6,
+ .psc_buf = {
+ .bc_nbufs = LDLM_NBUFS,
+ .bc_buf_size = LDLM_BUFSIZE,
+ .bc_req_max_size = LDLM_MAXREQSIZE,
+ .bc_rep_max_size = LDLM_MAXREPSIZE,
+ .bc_req_portal = LDLM_CANCEL_REQUEST_PORTAL,
+ .bc_rep_portal = LDLM_CANCEL_REPLY_PORTAL,
+
+ },
+ .psc_thr = {
+ .tc_thr_name = "ldlm_cn",
+ .tc_thr_factor = LDLM_THR_FACTOR,
+ .tc_nthrs_init = LDLM_NTHRS_INIT,
+ .tc_nthrs_base = LDLM_NTHRS_BASE,
+ .tc_nthrs_max = LDLM_NTHRS_MAX,
+ .tc_nthrs_user = ldlm_num_threads,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_MD_THREAD | \
+ LCT_DT_THREAD | \
+ LCT_CL_THREAD,
+ },
+ .psc_cpt = {
+ .cc_pattern = ldlm_cpts,
+ },
+ .psc_ops = {
+ .so_req_handler = ldlm_cancel_handler,
+ .so_hpreq_handler = ldlm_hpreq_handler,
+ },
+ };
+ ldlm_state->ldlm_cancel_service = \
+ ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
+ if (IS_ERR(ldlm_state->ldlm_cancel_service)) {
+ CERROR("failed to start service\n");
+ rc = PTR_ERR(ldlm_state->ldlm_cancel_service);
+ ldlm_state->ldlm_cancel_service = NULL;
+ GOTO(out, rc);
+ }
#endif
- ldlm_state->ldlm_cb_service =
- ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
- LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, 2,
- ldlm_callback_handler, "ldlm_cbd",
- ldlm_svc_proc_dir, NULL,
- ldlm_min_threads, ldlm_max_threads,
- "ldlm_cb",
- LCT_MD_THREAD|LCT_DT_THREAD, NULL);
-
- if (!ldlm_state->ldlm_cb_service) {
- CERROR("failed to start service\n");
- GOTO(out_proc, rc = -ENOMEM);
- }
-
- ldlm_state->ldlm_cancel_service =
- ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
- LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, 6,
- ldlm_cancel_handler, "ldlm_canceld",
- ldlm_svc_proc_dir, NULL,
- ldlm_min_threads, ldlm_max_threads,
- "ldlm_cn",
- LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
- NULL);
-
- if (!ldlm_state->ldlm_cancel_service) {
- CERROR("failed to start service\n");
- GOTO(out_proc, rc = -ENOMEM);
- }
-
- OBD_ALLOC(blp, sizeof(*blp));
- if (blp == NULL)
- GOTO(out_proc, rc = -ENOMEM);
+ OBD_ALLOC(blp, sizeof(*blp));
+ if (blp == NULL)
+ GOTO(out, rc = -ENOMEM);
ldlm_state->ldlm_bl_pool = blp;
- spin_lock_init(&blp->blp_lock);
+ cfs_spin_lock_init(&blp->blp_lock);
CFS_INIT_LIST_HEAD(&blp->blp_list);
CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
cfs_waitq_init(&blp->blp_waitq);
- atomic_set(&blp->blp_num_threads, 0);
- atomic_set(&blp->blp_busy_threads, 0);
- blp->blp_min_threads = ldlm_min_threads;
- blp->blp_max_threads = ldlm_max_threads;
+ cfs_atomic_set(&blp->blp_num_threads, 0);
+ cfs_atomic_set(&blp->blp_busy_threads, 0);
#ifdef __KERNEL__
- for (i = 0; i < blp->blp_min_threads; i++) {
- rc = ldlm_bl_thread_start(blp);
- if (rc < 0)
- GOTO(out_thread, rc);
- }
-
- rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);
- if (rc)
- GOTO(out_thread, rc);
-
- rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service);
- if (rc)
- GOTO(out_thread, rc);
-
+ if (ldlm_num_threads == 0) {
+ blp->blp_min_threads = LDLM_NTHRS_INIT;
+ blp->blp_max_threads = LDLM_NTHRS_MAX;
+ } else {
+ blp->blp_min_threads = blp->blp_max_threads = \
+ min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
+ ldlm_num_threads));
+ }
+
+ for (i = 0; i < blp->blp_min_threads; i++) {
+ rc = ldlm_bl_thread_start(blp);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+
+# ifdef HAVE_SERVER_SUPPORT
CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
expired_lock_thread.elt_state = ELT_STOPPED;
cfs_waitq_init(&expired_lock_thread.elt_waitq);
CFS_INIT_LIST_HEAD(&waiting_locks_list);
- spin_lock_init(&waiting_locks_spinlock);
+ cfs_spin_lock_init(&waiting_locks_spinlock);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
- rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
- if (rc < 0) {
- CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
- GOTO(out_thread, rc);
- }
-
- wait_event(expired_lock_thread.elt_waitq,
- expired_lock_thread.elt_state == ELT_READY);
-#endif
-
-#ifdef __KERNEL__
- rc = ldlm_pools_init();
- if (rc)
- GOTO(out_thread, rc);
+ rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
+ if (rc < 0) {
+ CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
+ GOTO(out, rc);
+ }
+
+ cfs_wait_event(expired_lock_thread.elt_waitq,
+ expired_lock_thread.elt_state == ELT_READY);
+# endif /* HAVE_SERVER_SUPPORT */
+
+ rc = ldlm_pools_init();
+ if (rc) {
+ CERROR("Failed to initialize LDLM pools: %d\n", rc);
+ GOTO(out, rc);
+ }
#endif
- RETURN(0);
+ RETURN(0);
-#ifdef __KERNEL__
- out_thread:
- ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
- ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
-#endif
-
- out_proc:
-#ifdef LPROCFS
- ldlm_proc_cleanup();
- out_free:
-#endif
- OBD_FREE(ldlm_state, sizeof(*ldlm_state));
- ldlm_state = NULL;
- return rc;
+ out:
+ ldlm_cleanup();
+ RETURN(rc);
}
static int ldlm_cleanup(void)
{
-#ifdef __KERNEL__
- struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
-#endif
ENTRY;
- if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
- !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
+ if (!cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
+ !cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
CERROR("ldlm still has namespaces; clean these up first.\n");
ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
#ifdef __KERNEL__
ldlm_pools_fini();
-#endif
-#ifdef __KERNEL__
- while (atomic_read(&blp->blp_num_threads) > 0) {
- struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
+ if (ldlm_state->ldlm_bl_pool != NULL) {
+ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- init_completion(&blp->blp_comp);
+ while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
+ struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
- spin_lock(&blp->blp_lock);
- list_add_tail(&blwi.blwi_entry, &blp->blp_list);
- cfs_waitq_signal(&blp->blp_waitq);
- spin_unlock(&blp->blp_lock);
+ cfs_init_completion(&blp->blp_comp);
- wait_for_completion(&blp->blp_comp);
- }
- OBD_FREE(blp, sizeof(*blp));
+ cfs_spin_lock(&blp->blp_lock);
+ cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list);
+ cfs_waitq_signal(&blp->blp_waitq);
+ cfs_spin_unlock(&blp->blp_lock);
- ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
- ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
- ldlm_proc_cleanup();
+ cfs_wait_for_completion(&blp->blp_comp);
+ }
- expired_lock_thread.elt_state = ELT_TERMINATE;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- wait_event(expired_lock_thread.elt_waitq,
- expired_lock_thread.elt_state == ELT_STOPPED);
-#else
- ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
- ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
-#endif
+ OBD_FREE(blp, sizeof(*blp));
+ }
+#endif /* __KERNEL__ */
+
+ if (ldlm_state->ldlm_cb_service != NULL)
+ ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+# ifdef HAVE_SERVER_SUPPORT
+ if (ldlm_state->ldlm_cancel_service != NULL)
+ ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+# endif
+
+#ifdef __KERNEL__
+ ldlm_proc_cleanup();
+
+# ifdef HAVE_SERVER_SUPPORT
+ if (expired_lock_thread.elt_state != ELT_STOPPED) {
+ expired_lock_thread.elt_state = ELT_TERMINATE;
+ cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ cfs_wait_event(expired_lock_thread.elt_waitq,
+ expired_lock_thread.elt_state == ELT_STOPPED);
+ }
+# endif
+#endif /* __KERNEL__ */
OBD_FREE(ldlm_state, sizeof(*ldlm_state));
ldlm_state = NULL;
RETURN(0);
}
-int __init ldlm_init(void)
+int ldlm_init(void)
{
- init_mutex(&ldlm_ref_sem);
- init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
- init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
+ cfs_mutex_init(&ldlm_ref_mutex);
+ cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
+ cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
sizeof(struct ldlm_resource), 0,
- SLAB_HWCACHE_ALIGN);
+ CFS_SLAB_HWCACHE_ALIGN);
if (ldlm_resource_slab == NULL)
return -ENOMEM;
ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
- sizeof(struct ldlm_lock), 0,
- SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
+ sizeof(struct ldlm_lock), 0,
+ CFS_SLAB_HWCACHE_ALIGN | CFS_SLAB_DESTROY_BY_RCU);
if (ldlm_lock_slab == NULL) {
cfs_mem_cache_destroy(ldlm_resource_slab);
return -ENOMEM;
ldlm_interval_slab = cfs_mem_cache_create("interval_node",
sizeof(struct ldlm_interval),
- 0, SLAB_HWCACHE_ALIGN);
+ 0, CFS_SLAB_HWCACHE_ALIGN);
if (ldlm_interval_slab == NULL) {
cfs_mem_cache_destroy(ldlm_resource_slab);
cfs_mem_cache_destroy(ldlm_lock_slab);
return -ENOMEM;
}
-
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+ class_export_dump_hook = ldlm_dump_export_locks;
+#endif
return 0;
}
-void __exit ldlm_exit(void)
+void ldlm_exit(void)
{
int rc;
if (ldlm_refcount)
EXPORT_SYMBOL(ldlm_extent_shift_kms);
/* ldlm_lock.c */
+#ifdef HAVE_SERVER_SUPPORT
EXPORT_SYMBOL(ldlm_get_processing_policy);
+#endif
EXPORT_SYMBOL(ldlm_lock2desc);
EXPORT_SYMBOL(ldlm_register_intent);
EXPORT_SYMBOL(ldlm_lockname);
EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
EXPORT_SYMBOL(ldlm_lock_change_resource);
EXPORT_SYMBOL(ldlm_it2str);
-EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
-EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
EXPORT_SYMBOL(ldlm_lock_allow_match);
EXPORT_SYMBOL(ldlm_replay_locks);
EXPORT_SYMBOL(ldlm_resource_foreach);
EXPORT_SYMBOL(ldlm_namespace_foreach);
-EXPORT_SYMBOL(ldlm_namespace_foreach_res);
EXPORT_SYMBOL(ldlm_resource_iterate);
EXPORT_SYMBOL(ldlm_cancel_resource_local);
+EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
EXPORT_SYMBOL(ldlm_cli_cancel_list);
/* ldlm_lockd.c */
+#ifdef HAVE_SERVER_SUPPORT
EXPORT_SYMBOL(ldlm_server_blocking_ast);
EXPORT_SYMBOL(ldlm_server_completion_ast);
EXPORT_SYMBOL(ldlm_server_glimpse_ast);
+EXPORT_SYMBOL(ldlm_glimpse_locks);
EXPORT_SYMBOL(ldlm_handle_enqueue);
EXPORT_SYMBOL(ldlm_handle_enqueue0);
EXPORT_SYMBOL(ldlm_handle_cancel);
EXPORT_SYMBOL(ldlm_request_cancel);
EXPORT_SYMBOL(ldlm_handle_convert);
EXPORT_SYMBOL(ldlm_handle_convert0);
+EXPORT_SYMBOL(ldlm_revoke_export_locks);
+#endif
EXPORT_SYMBOL(ldlm_del_waiting_lock);
EXPORT_SYMBOL(ldlm_get_ref);
EXPORT_SYMBOL(ldlm_put_ref);
EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
-EXPORT_SYMBOL(ldlm_revoke_export_locks);
/* ldlm_resource.c */
EXPORT_SYMBOL(ldlm_namespace_new);
EXPORT_SYMBOL(client_obd_cleanup);
EXPORT_SYMBOL(client_connect_import);
EXPORT_SYMBOL(client_disconnect_export);
+EXPORT_SYMBOL(target_send_reply);
+EXPORT_SYMBOL(target_pack_pool_reply);
+
+#ifdef HAVE_SERVER_SUPPORT
+EXPORT_SYMBOL(server_disconnect_export);
EXPORT_SYMBOL(target_stop_recovery_thread);
EXPORT_SYMBOL(target_handle_connect);
EXPORT_SYMBOL(target_cleanup_recovery);
EXPORT_SYMBOL(target_destroy_export);
EXPORT_SYMBOL(target_cancel_recovery_timer);
-EXPORT_SYMBOL(target_send_reply);
EXPORT_SYMBOL(target_queue_recovery_request);
EXPORT_SYMBOL(target_handle_ping);
-EXPORT_SYMBOL(target_pack_pool_reply);
EXPORT_SYMBOL(target_handle_disconnect);
+#endif
/* l_lock.c */
EXPORT_SYMBOL(lock_res_and_lock);