* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2010, 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
extern cfs_mem_cache_t *ldlm_resource_slab;
extern cfs_mem_cache_t *ldlm_lock_slab;
-static cfs_mutex_t ldlm_ref_mutex;
+static struct mutex ldlm_ref_mutex;
static int ldlm_refcount;
struct ldlm_cb_async_args {
#define ELT_TERMINATE 2
struct ldlm_bl_pool {
- cfs_spinlock_t blp_lock;
+ spinlock_t blp_lock;
/*
* blp_prio_list is used for callbacks that should be handled
cfs_list_t blp_list;
cfs_waitq_t blp_waitq;
- cfs_completion_t blp_comp;
+ struct completion blp_comp;
cfs_atomic_t blp_num_threads;
cfs_atomic_t blp_busy_threads;
int blp_min_threads;
struct ldlm_lock *blwi_lock;
cfs_list_t blwi_head;
int blwi_count;
- cfs_completion_t blwi_comp;
- int blwi_mode;
+ struct completion blwi_comp;
+ ldlm_cancel_flags_t blwi_flags;
int blwi_mem_pressure;
};
#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__)
-/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
-static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
+/**
+ * Protects both waiting_locks_list and expired_lock_thread.
+ */
+static spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
+
+/**
+ * List for contended locks.
+ *
+ * As soon as a lock is contended, it gets placed on this list and
+ * expected time to get a response is filled in the lock. A special
+ * thread walks the list looking for locks that should be released and
+ * schedules client evictions for those that have not been released in
+ * time.
+ *
+ * All access to it should be under waiting_locks_spinlock.
+ */
static cfs_list_t waiting_locks_list;
static cfs_timer_t waiting_locks_timer;
static inline int have_expired_locks(void)
{
- int need_to_run;
+ int need_to_run;
- ENTRY;
- cfs_spin_lock_bh(&waiting_locks_spinlock);
- need_to_run = !cfs_list_empty(&expired_lock_thread.elt_expired_locks);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ ENTRY;
+ spin_lock_bh(&waiting_locks_spinlock);
+ need_to_run = !cfs_list_empty(&expired_lock_thread.elt_expired_locks);
+ spin_unlock_bh(&waiting_locks_spinlock);
- RETURN(need_to_run);
+ RETURN(need_to_run);
}
+/**
+ * Check expired lock list for expired locks and time them out.
+ */
static int expired_lock_main(void *arg)
{
cfs_list_t *expired = &expired_lock_thread.elt_expired_locks;
expired_lock_thread.elt_state == ELT_TERMINATE,
&lwi);
- cfs_spin_lock_bh(&waiting_locks_spinlock);
- if (expired_lock_thread.elt_dump) {
- struct libcfs_debug_msg_data msgdata = {
- .msg_file = __FILE__,
- .msg_fn = "waiting_locks_callback",
- .msg_line = expired_lock_thread.elt_dump };
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
+ if (expired_lock_thread.elt_dump) {
+ struct libcfs_debug_msg_data msgdata = {
+ .msg_file = __FILE__,
+ .msg_fn = "waiting_locks_callback",
+ .msg_line = expired_lock_thread.elt_dump };
+ spin_unlock_bh(&waiting_locks_spinlock);
- /* from waiting_locks_callback, but not in timer */
- libcfs_debug_dumplog();
- libcfs_run_lbug_upcall(&msgdata);
+ /* from waiting_locks_callback, but not in timer */
+ libcfs_debug_dumplog();
+ libcfs_run_lbug_upcall(&msgdata);
- cfs_spin_lock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
expired_lock_thread.elt_dump = 0;
}
l_pending_chain);
if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
(void *)lock >= LP_POISON) {
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_unlock_bh(&waiting_locks_spinlock);
CERROR("free lock on elt list %p\n", lock);
LBUG();
}
LDLM_LOCK_RELEASE(lock);
continue;
}
- export = class_export_lock_get(lock->l_export, lock);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ export = class_export_lock_get(lock->l_export, lock);
+ spin_unlock_bh(&waiting_locks_spinlock);
- do_dump++;
- class_fail_export(export);
- class_export_lock_put(export, lock);
+ do_dump++;
+ class_fail_export(export);
+ class_export_lock_put(export, lock);
- /* release extra ref grabbed by ldlm_add_waiting_lock()
- * or ldlm_failed_ast() */
- LDLM_LOCK_RELEASE(lock);
+ /* release extra ref grabbed by ldlm_add_waiting_lock()
+ * or ldlm_failed_ast() */
+ LDLM_LOCK_RELEASE(lock);
- cfs_spin_lock_bh(&waiting_locks_spinlock);
- }
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
+ }
+ spin_unlock_bh(&waiting_locks_spinlock);
if (do_dump && obd_dump_on_eviction) {
CERROR("dump the log upon eviction\n");
*/
static int ldlm_lock_busy(struct ldlm_lock *lock)
{
- struct ptlrpc_request *req;
- int match = 0;
- ENTRY;
+ struct ptlrpc_request *req;
+ int match = 0;
+ ENTRY;
- if (lock->l_export == NULL)
- return 0;
+ if (lock->l_export == NULL)
+ return 0;
- cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
- rq_exp_list) {
- if (req->rq_ops->hpreq_lock_match) {
- match = req->rq_ops->hpreq_lock_match(req, lock);
- if (match)
- break;
- }
- }
- cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
- RETURN(match);
+ spin_lock_bh(&lock->l_export->exp_rpc_lock);
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ rq_exp_list) {
+ if (req->rq_ops->hpreq_lock_match) {
+ match = req->rq_ops->hpreq_lock_match(req, lock);
+ if (match)
+ break;
+ }
+ }
+ spin_unlock_bh(&lock->l_export->exp_rpc_lock);
+ RETURN(match);
}
/* This is called from within a timer interrupt and cannot schedule */
struct ldlm_lock *lock;
int need_dump = 0;
- cfs_spin_lock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
while (!cfs_list_empty(&waiting_locks_list)) {
lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
LDLM_LOCK_GET(lock);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_DEBUG(lock, "prolong the busy lock");
- ldlm_refresh_waiting_lock(lock,
- ldlm_get_enq_timeout(lock));
- cfs_spin_lock_bh(&waiting_locks_spinlock);
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_DEBUG(lock, "prolong the busy lock");
+ ldlm_refresh_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
+ spin_lock_bh(&waiting_locks_spinlock);
if (!cont) {
LDLM_LOCK_RELEASE(lock);
timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
}
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_unlock_bh(&waiting_locks_spinlock);
}
-/*
+/**
+ * Add lock to the list of contended locks.
+ *
* Indicate that we're waiting for a client to call us back cancelling a given
* lock. We add it to the pending-callback chain, and schedule the lock-timeout
* timer to fire appropriately. (We round up to the next second, to avoid
LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
- cfs_spin_lock_bh(&waiting_locks_spinlock);
- if (lock->l_destroyed) {
- static cfs_time_t next;
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
+ if (lock->l_destroyed) {
+ static cfs_time_t next;
+ spin_unlock_bh(&waiting_locks_spinlock);
LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
if (cfs_time_after(cfs_time_current(), next)) {
next = cfs_time_shift(14400);
* waiting list */
LDLM_LOCK_GET(lock);
}
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_unlock_bh(&waiting_locks_spinlock);
- if (ret) {
- cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
- if (cfs_list_empty(&lock->l_exp_list))
- cfs_list_add(&lock->l_exp_list,
- &lock->l_export->exp_bl_list);
- cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
- }
+ if (ret) {
+ spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ if (cfs_list_empty(&lock->l_exp_list))
+ cfs_list_add(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+ }
- LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
- ret == 0 ? "not re-" : "", timeout,
- AT_OFF ? "off" : "on");
- return ret;
+ LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
+ ret == 0 ? "not re-" : "", timeout,
+ AT_OFF ? "off" : "on");
+ return ret;
}
-/*
+/**
* Remove a lock from the pending list, likely because it had its cancellation
* callback arrive without incident. This adjusts the lock-timeout timer if
* needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
return 0;
}
- cfs_spin_lock_bh(&waiting_locks_spinlock);
- ret = __ldlm_del_waiting_lock(lock);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
+ ret = __ldlm_del_waiting_lock(lock);
+ spin_unlock_bh(&waiting_locks_spinlock);
- /* remove the lock out of export blocking list */
- cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
- cfs_list_del_init(&lock->l_exp_list);
- cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+ /* remove the lock out of export blocking list */
+ spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ cfs_list_del_init(&lock->l_exp_list);
+ spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
if (ret) {
/* release lock ref if it has indeed been removed
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
}
+EXPORT_SYMBOL(ldlm_del_waiting_lock);
-/*
- * Prolong the lock
+/**
+ * Prolong the contended lock waiting time.
*
* Called with namespace lock held.
*/
int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
{
- if (lock->l_export == NULL) {
- /* We don't have a "waiting locks list" on clients. */
- LDLM_DEBUG(lock, "client lock: no-op");
- return 0;
- }
+ if (lock->l_export == NULL) {
+ /* We don't have a "waiting locks list" on clients. */
+ LDLM_DEBUG(lock, "client lock: no-op");
+ return 0;
+ }
- cfs_spin_lock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
- if (cfs_list_empty(&lock->l_pending_chain)) {
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_DEBUG(lock, "wasn't waiting");
- return 0;
- }
+ if (cfs_list_empty(&lock->l_pending_chain)) {
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_DEBUG(lock, "wasn't waiting");
+ return 0;
+ }
- /* we remove/add the lock to the waiting list, so no needs to
- * release/take a lock reference */
- __ldlm_del_waiting_lock(lock);
- __ldlm_add_waiting_lock(lock, timeout);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ /* we remove/add the lock to the waiting list, so no needs to
+ * release/take a lock reference */
+ __ldlm_del_waiting_lock(lock);
+ __ldlm_add_waiting_lock(lock, timeout);
+ spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_DEBUG(lock, "refreshed");
- return 1;
+ LDLM_DEBUG(lock, "refreshed");
+ return 1;
}
+EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
#else /* !HAVE_SERVER_SUPPORT || !__KERNEL__ */
#ifdef HAVE_SERVER_SUPPORT
+/**
+ * Perform lock cleanup if AST sending failed.
+ */
static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
const char *ast_type)
{
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
#ifdef __KERNEL__
- cfs_spin_lock_bh(&waiting_locks_spinlock);
- if (__ldlm_del_waiting_lock(lock) == 0)
- /* the lock was not in any list, grab an extra ref before adding
- * the lock to the expired list */
- LDLM_LOCK_GET(lock);
- cfs_list_add(&lock->l_pending_chain,
- &expired_lock_thread.elt_expired_locks);
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ spin_lock_bh(&waiting_locks_spinlock);
+ if (__ldlm_del_waiting_lock(lock) == 0)
+ /* the lock was not in any list, grab an extra ref before adding
+ * the lock to the expired list */
+ LDLM_LOCK_GET(lock);
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ spin_unlock_bh(&waiting_locks_spinlock);
#else
- class_fail_export(lock->l_export);
+ class_fail_export(lock->l_export);
#endif
}
+/**
+ * Perform lock cleanup if AST reply came with error.
+ */
static int ldlm_handle_ast_error(struct ldlm_lock *lock,
struct ptlrpc_request *req, int rc,
const char *ast_type)
ENTRY;
LASSERT(lock != NULL);
- if (rc != 0) {
- rc = ldlm_handle_ast_error(lock, req, rc,
- arg->type == LDLM_BL_CALLBACK
- ? "blocking" : "completion");
- if (rc == -ERESTART)
- cfs_atomic_inc(&arg->restart);
- }
+
+ switch (arg->type) {
+ case LDLM_GL_CALLBACK:
+ /* Update the LVB from disk if the AST failed
+ * (this is a legal race)
+ *
+ * - Glimpse callback of local lock just returns
+ * -ELDLM_NO_LOCK_DATA.
+ * - Glimpse callback of remote lock might return
+ * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
+ */
+ if (rc == -ELDLM_NO_LOCK_DATA) {
+ LDLM_DEBUG(lock, "lost race - client has a lock but no "
+ "inode");
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+ } else if (rc != 0) {
+ rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+ } else {
+ rc = ldlm_res_lvbo_update(lock->l_resource, req, 1);
+ }
+ break;
+ case LDLM_BL_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+ break;
+ case LDLM_CP_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+ break;
+ default:
+ LDLM_ERROR(lock, "invalid opcode for lock callback %d",
+ arg->type);
+ LBUG();
+ }
+
+ /* release extra reference taken in ldlm_ast_fini() */
LDLM_LOCK_RELEASE(lock);
+ if (rc == -ERESTART)
+ cfs_atomic_inc(&arg->restart);
+
RETURN(0);
}
-static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
- struct ldlm_cb_set_arg *arg,
- struct ldlm_lock *lock,
- int instant_cancel)
+static inline int ldlm_ast_fini(struct ptlrpc_request *req,
+ struct ldlm_cb_set_arg *arg,
+ struct ldlm_lock *lock,
+ int instant_cancel)
{
int rc = 0;
ENTRY;
*/
static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
{
- struct ptlrpc_request *req;
- ENTRY;
+ struct ptlrpc_request *req;
+ ENTRY;
- if (lock->l_export == NULL) {
- LDLM_DEBUG(lock, "client lock: no-op");
- RETURN_EXIT;
- }
-
- cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
- rq_exp_list) {
- /* Do not process requests that were not yet added to there
- * incoming queue or were already removed from there for
- * processing */
- if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
- req->rq_ops->hpreq_lock_match &&
- req->rq_ops->hpreq_lock_match(req, lock))
- ptlrpc_hpreq_reorder(req);
- }
- cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
- EXIT;
+ if (lock->l_export == NULL) {
+ LDLM_DEBUG(lock, "client lock: no-op");
+ RETURN_EXIT;
+ }
+
+ spin_lock_bh(&lock->l_export->exp_rpc_lock);
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ rq_exp_list) {
+ /* Do not process requests that were not yet added to there
+ * incoming queue or were already removed from there for
+ * processing. We evaluate ptlrpc_nrs_req_can_move() without
+ * holding svcpt->scp_req_lock, and then redo the check with
+ * the lock held once we need to obtain a reliable result.
+ */
+ if (ptlrpc_nrs_req_can_move(req) &&
+ req->rq_ops->hpreq_lock_match &&
+ req->rq_ops->hpreq_lock_match(req, lock))
+ ptlrpc_nrs_req_hp_move(req);
+ }
+ spin_unlock_bh(&lock->l_export->exp_rpc_lock);
+ EXIT;
}
-/*
+/**
* ->l_blocking_ast() method for server-side locks. This is invoked when newly
* enqueued server lock conflicts with given one.
*
- * Sends blocking ast rpc to the client owning that lock; arms timeout timer
+ * Sends blocking AST RPC to the client owning that lock; arms timeout timer
* to wait for client response.
*/
int ldlm_server_blocking_ast(struct ldlm_lock *lock,
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
body->lock_handle[0] = lock->l_remote_handle;
body->lock_desc = *desc;
- body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
+ body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_AST_FLAGS);
LDLM_DEBUG(lock, "server preparing blocking AST");
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
+EXPORT_SYMBOL(ldlm_server_blocking_ast);
-int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
+/**
+ * ->l_completion_ast callback for a remote lock in server namespace.
+ *
+ * Sends AST to the client notifying it of lock granting. If initial
+ * lock response was not sent yet, instead of sending another RPC, just
+ * mark the lock as granted and client will understand
+ */
+int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
{
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
+ int lvb_len;
ENTRY;
LASSERT(lock != NULL);
if (req == NULL)
RETURN(-ENOMEM);
- /* server namespace, doesn't need lock */
- if (lock->l_resource->lr_lvb_len) {
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
- lock->l_resource->lr_lvb_len);
- }
-
+ /* server namespace, doesn't need lock */
+ lvb_len = ldlm_lvbo_size(lock);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len);
rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
if (rc) {
ptlrpc_request_free(req);
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
body->lock_handle[0] = lock->l_remote_handle;
- body->lock_flags = flags;
+ body->lock_flags = ldlm_flags_to_wire(flags);
ldlm_lock2desc(lock, &body->lock_desc);
- if (lock->l_resource->lr_lvb_len) {
- void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
-
- lock_res(lock->l_resource);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
- unlock_res(lock->l_resource);
+ if (lvb_len > 0) {
+ void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+
+ lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
+ if (lvb_len < 0) {
+ /* We still need to send the RPC to wake up the blocked
+ * enqueue thread on the client.
+ *
+ * Consider old client, there is no better way to notify
+ * the failure, just zero-sized the LVB, then the client
+ * will fail out as "-EPROTO". */
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
+ RCL_CLIENT);
+ instant_cancel = 1;
+ } else {
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
+ RCL_CLIENT);
+ }
}
LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
/* We only send real blocking ASTs after the lock is granted */
lock_res_and_lock(lock);
if (lock->l_flags & LDLM_FL_AST_SENT) {
- body->lock_flags |= LDLM_FL_AST_SENT;
- /* copy ast flags like LDLM_FL_DISCARD_DATA */
- body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
+ body->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
+ /* Copy AST flags like LDLM_FL_DISCARD_DATA. */
+ body->lock_flags |= ldlm_flags_to_wire(lock->l_flags &
+ LDLM_AST_FLAGS);
/* We might get here prior to ldlm_handle_enqueue setting
* LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
- RETURN(rc);
+ RETURN(lvb_len < 0 ? lvb_len : rc);
}
+EXPORT_SYMBOL(ldlm_server_completion_ast);
+/**
+ * Server side ->l_glimpse_ast handler for client locks.
+ *
+ * Sends glimpse AST to the client and waits for reply. Then updates
+ * lvbo with the result.
+ */
int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
{
- struct ldlm_resource *res = lock->l_resource;
- struct ldlm_request *body;
- struct ptlrpc_request *req;
- int rc;
+ struct ldlm_cb_set_arg *arg = data;
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ struct ldlm_cb_async_args *ca;
+ int rc;
+ struct req_format *req_fmt;
ENTRY;
LASSERT(lock != NULL);
+ if (arg->gl_desc != NULL)
+ /* There is a glimpse descriptor to pack */
+ req_fmt = &RQF_LDLM_GL_DESC_CALLBACK;
+ else
+ req_fmt = &RQF_LDLM_GL_CALLBACK;
+
req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
- &RQF_LDLM_GL_CALLBACK,
- LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK);
+ req_fmt, LUSTRE_DLM_VERSION,
+ LDLM_GL_CALLBACK);
if (req == NULL)
RETURN(-ENOMEM);
+ if (arg->gl_desc != NULL) {
+ /* copy the GL descriptor */
+ union ldlm_gl_desc *desc;
+ desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
+ *desc = *arg->gl_desc;
+ }
+
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
body->lock_handle[0] = lock->l_remote_handle;
ldlm_lock2desc(lock, &body->lock_desc);
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
/* server namespace, doesn't need lock */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
- lock->l_resource->lr_lvb_len);
- res = lock->l_resource;
+ ldlm_lvbo_size(lock));
ptlrpc_request_set_replen(req);
-
req->rq_send_state = LUSTRE_IMP_FULL;
/* ptlrpc_request_alloc_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
+ req->rq_interpret_reply = ldlm_cb_interpret;
+
if (lock->l_export && lock->l_export->exp_nid_stats &&
lock->l_export->exp_nid_stats->nid_ldlm_stats)
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
- rc = ptlrpc_queue_wait(req);
- /* Update the LVB from disk if the AST failed (this is a legal race)
- *
- * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
- * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
- * when inode is cleared. LU-274
- */
- if (rc == -ELDLM_NO_LOCK_DATA) {
- LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
- ldlm_res_lvbo_update(res, NULL, 1);
- } else if (rc != 0) {
- rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
- } else {
- rc = ldlm_res_lvbo_update(res, req, 1);
- }
+ rc = ldlm_ast_fini(req, arg, lock, 0);
- ptlrpc_req_finished(req);
- if (rc == -ERESTART)
- ldlm_reprocess_all(res);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_server_glimpse_ast);
- RETURN(rc);
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
+{
+ int rc;
+ ENTRY;
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
+ LDLM_WORK_GL_AST);
+ if (rc == -ERESTART)
+ ldlm_reprocess_all(res);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_glimpse_locks);
+
+/* return LDLM lock associated with a lock callback request */
+struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req)
+{
+ struct ldlm_cb_async_args *ca;
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ ca = ptlrpc_req_async_args(req);
+ lock = ca->ca_lock;
+ if (lock == NULL)
+ RETURN(ERR_PTR(-EFAULT));
+
+ RETURN(lock);
}
+EXPORT_SYMBOL(ldlm_request_lock);
static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
struct lprocfs_stats *srv_stats)
return;
}
-/*
- * Main server-side entry point into LDLM. This is called by ptlrpc service
- * threads to carry out client lock enqueueing requests.
+/**
+ * Main server-side entry point into LDLM for enqueue. This is called by ptlrpc
+ * service threads to carry out client lock enqueueing requests.
*/
int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
struct ptlrpc_request *req,
const struct ldlm_callback_suite *cbs)
{
struct ldlm_reply *dlm_rep;
- __u32 flags;
+ __u64 flags;
ldlm_error_t err = ELDLM_OK;
struct ldlm_lock *lock = NULL;
void *cookie = NULL;
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
- flags = dlm_req->lock_flags;
+ flags = ldlm_flags_from_wire(dlm_req->lock_flags);
LASSERT(req->rq_export);
GOTO(out, rc = -EFAULT);
}
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
+ if (exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) {
if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
LDLM_PLAIN)) {
DEBUG_REQ(D_ERROR, req,
/* INODEBITS_INTEROP: Perform conversion from plain lock to
* inodebits lock if client does not support them. */
- if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
+ if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) &&
(dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
if (unlikely(flags & LDLM_FL_REPLAY)) {
/* Find an existing lock in the per-export lock hash */
+ /* In the function below, .hs_keycmp resolves to
+ * ldlm_export_lock_keycmp() */
+ /* coverity[overrun-buffer-val] */
lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
(void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
dlm_req->lock_desc.l_req_mode,
- cbs, NULL, 0);
-
+ cbs, NULL, 0, LVB_T_NONE);
if (!lock)
GOTO(out, rc = -ENOMEM);
/* based on the assumption that lvb size never changes during
* resource life time otherwise it need resource->lr_lock's
* protection */
- if (lock->l_resource->lr_lvb_len) {
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
- RCL_SERVER,
- lock->l_resource->lr_lvb_len);
- }
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER, ldlm_lvbo_size(lock));
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
lock->l_req_extent = lock->l_policy_data.l_extent;
- err = ldlm_lock_enqueue(ns, &lock, cookie, (int *)&flags);
+ err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
if (err)
GOTO(out, err);
dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- dlm_rep->lock_flags = flags;
+ dlm_rep->lock_flags = ldlm_flags_to_wire(flags);
ldlm_lock2desc(lock, &dlm_rep->lock_desc);
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
/* Now take into account flags to be inherited from original lock
request both in reply to client and in our own lock flags. */
dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
- lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
+ lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
+ LDLM_INHERIT_FLAGS);
/* Don't move a pending lock onto the export if it has already been
* disconnected due to eviction (bug 5683) or server umount (bug 24324).
LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
rc = -ENOTCONN;
} else if (lock->l_flags & LDLM_FL_AST_SENT) {
- dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
+ dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
if (lock->l_granted_mode == lock->l_req_mode) {
/*
* Only cancel lock if it was granted, because it would
"(err=%d, rc=%d)", err, rc);
if (rc == 0) {
- if (lock->l_resource->lr_lvb_len > 0) {
- /* MDT path won't handle lr_lvb_data, so
- * lock/unlock better be contained in the
- * if block */
- void *lvb;
-
- lvb = req_capsule_server_get(&req->rq_pill,
- &RMF_DLM_LVB);
- LASSERTF(lvb != NULL, "req %p, lock %p\n",
- req, lock);
- lock_res(lock->l_resource);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
- unlock_res(lock->l_resource);
- }
+ if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER) &&
+ ldlm_lvbo_size(lock) > 0) {
+ void *buf;
+ int buflen;
+
+ buf = req_capsule_server_get(&req->rq_pill,
+ &RMF_DLM_LVB);
+ LASSERTF(buf != NULL, "req %p, lock %p\n",
+ req, lock);
+ buflen = req_capsule_get_size(&req->rq_pill,
+ &RMF_DLM_LVB, RCL_SERVER);
+ buflen = ldlm_lvbo_fill(lock, buf, buflen);
+ if (buflen >= 0)
+ req_capsule_shrink(&req->rq_pill,
+ &RMF_DLM_LVB,
+ buflen, RCL_SERVER);
+ else
+ rc = buflen;
+ }
} else {
lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock);
return rc;
}
+EXPORT_SYMBOL(ldlm_handle_enqueue0);
+/**
+ * Old-style LDLM main entry point for server code enqueue.
+ */
int ldlm_handle_enqueue(struct ptlrpc_request *req,
ldlm_completion_callback completion_callback,
ldlm_blocking_callback blocking_callback,
}
return rc;
}
+EXPORT_SYMBOL(ldlm_handle_enqueue);
+/**
+ * Main LDLM entry point for server code to process lock conversion requests.
+ */
int ldlm_handle_convert0(struct ptlrpc_request *req,
const struct ldlm_request *dlm_req)
{
RETURN(0);
}
+EXPORT_SYMBOL(ldlm_handle_convert0);
+/**
+ * Old-style main LDLM entry point for server code to process lock conversion
+ * requests.
+ */
int ldlm_handle_convert(struct ptlrpc_request *req)
{
int rc;
}
return rc;
}
+EXPORT_SYMBOL(ldlm_handle_convert);
-/* Cancel all the locks whos handles are packed into ldlm_request */
+/**
+ * Cancel all the locks whose handles are packed into ldlm_request
+ *
+ * Called by server code expecting such combined cancel activity
+ * requests.
+ */
int ldlm_request_cancel(struct ptlrpc_request *req,
const struct ldlm_request *dlm_req, int first)
{
if (first >= count)
RETURN(0);
+ if (count == 1 && dlm_req->lock_handle[0].cookie == 0)
+ RETURN(0);
+
/* There is no lock on the server at the replay time,
* skip lock cancelling to make replay tests to pass. */
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
res = lock->l_resource;
done++;
+ /* This code is an optimization to only attempt lock
+ * granting on the resource (that could be CPU-expensive)
+ * after we are done cancelling lock in that resource. */
if (res != pres) {
if (pres != NULL) {
ldlm_reprocess_all(pres);
LDLM_DEBUG_NOLOCK("server-side cancel handler END");
RETURN(done);
}
+EXPORT_SYMBOL(ldlm_request_cancel);
+/**
+ * Main LDLM entry point for server code to cancel locks.
+ *
+ * Typically gets called from service handler on LDLM_CANCEL opc.
+ */
int ldlm_handle_cancel(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
RETURN(ptlrpc_reply(req));
}
+EXPORT_SYMBOL(ldlm_handle_cancel);
#endif /* HAVE_SERVER_SUPPORT */
+/**
+ * Callback handler for receiving incoming blocking ASTs.
+ *
+ * This can only happen on client side.
+ */
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
{
EXIT;
}
+/**
+ * Callback handler for receiving incoming completion ASTs.
+ *
+ * This only can happen on client side.
+ */
static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct ldlm_request *dlm_req,
{
int lvb_len;
CFS_LIST_HEAD(ast_list);
+ int rc = 0;
ENTRY;
LDLM_DEBUG(lock, "client completion callback handler START");
}
lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
- if (lvb_len > 0) {
+ if (lvb_len < 0) {
+ LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
+ GOTO(out, rc = lvb_len);
+ } else if (lvb_len > 0) {
if (lock->l_lvb_len > 0) {
/* for extent lock, lvb contains ost_lvb{}. */
LASSERT(lock->l_lvb_data != NULL);
- LASSERTF(lock->l_lvb_len == lvb_len,
- "preallocated %d, actual %d.\n",
- lock->l_lvb_len, lvb_len);
- } else { /* for layout lock, lvb has variable length */
+
+ if (unlikely(lock->l_lvb_len < lvb_len)) {
+ LDLM_ERROR(lock, "Replied LVB is larger than "
+ "expectation, expected = %d, "
+ "replied = %d",
+ lock->l_lvb_len, lvb_len);
+ GOTO(out, rc = -EINVAL);
+ }
+ } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
+ * variable length */
void *lvb_data;
OBD_ALLOC(lvb_data, lvb_len);
- if (lvb_data == NULL)
- LDLM_ERROR(lock, "no memory.\n");
-
- lock_res_and_lock(lock);
if (lvb_data == NULL) {
- lock->l_flags |= LDLM_FL_FAILED;
- } else {
- LASSERT(lock->l_lvb_data == NULL);
- lock->l_lvb_data = lvb_data;
- lock->l_lvb_len = lvb_len;
+ LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
+ GOTO(out, rc = -ENOMEM);
}
+
+ lock_res_and_lock(lock);
+ LASSERT(lock->l_lvb_data == NULL);
+ lock->l_lvb_data = lvb_data;
+ lock->l_lvb_len = lvb_len;
unlock_res_and_lock(lock);
}
}
/* bug 11300: the lock has already been granted */
unlock_res_and_lock(lock);
LDLM_DEBUG(lock, "Double grant race happened");
- LDLM_LOCK_RELEASE(lock);
- EXIT;
- return;
+ GOTO(out, rc = 0);
}
/* If we receive the completion AST before the actual enqueue returned,
&lock->l_resource->lr_name,
sizeof(lock->l_resource->lr_name)) != 0) {
unlock_res_and_lock(lock);
- if (ldlm_lock_change_resource(ns, lock,
- &dlm_req->lock_desc.l_resource.lr_name) != 0) {
- LDLM_ERROR(lock, "Failed to allocate resource");
- LDLM_LOCK_RELEASE(lock);
- EXIT;
- return;
- }
+ rc = ldlm_lock_change_resource(ns, lock,
+ &dlm_req->lock_desc.l_resource.lr_name);
+ if (rc < 0) {
+ LDLM_ERROR(lock, "Failed to allocate resource");
+ GOTO(out, rc);
+ }
LDLM_DEBUG(lock, "completion AST, new resource");
CERROR("change resource!\n");
lock_res_and_lock(lock);
}
if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
- /* BL_AST locks are not needed in lru.
- * let ldlm_cancel_lru() be fast. */
+ /* BL_AST locks are not needed in LRU.
+ * Let ldlm_cancel_lru() be fast. */
ldlm_lock_remove_from_lru(lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
LDLM_DEBUG(lock, "completion AST includes blocking AST");
}
- if (lock->l_lvb_len) {
- if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
- RCL_CLIENT) < lock->l_lvb_len) {
- LDLM_ERROR(lock, "completion AST did not contain "
- "expected LVB!");
- } else {
- void *lvb = req_capsule_client_get(&req->rq_pill,
- &RMF_DLM_LVB);
- memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
- }
- }
+ if (lock->l_lvb_len > 0) {
+ rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
+ lock->l_lvb_data, lvb_len);
+ if (rc < 0) {
+ unlock_res_and_lock(lock);
+ GOTO(out, rc);
+ }
+ }
ldlm_grant_lock(lock, &ast_list);
unlock_res_and_lock(lock);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
lock);
- LDLM_LOCK_RELEASE(lock);
- EXIT;
+ GOTO(out, rc);
+
+out:
+ if (rc < 0) {
+ lock_res_and_lock(lock);
+ lock->l_flags |= LDLM_FL_FAILED;
+ unlock_res_and_lock(lock);
+ cfs_waitq_signal(&lock->l_waitq);
+ }
+ LDLM_LOCK_RELEASE(lock);
}
+/**
+ * Callback handler for receiving incoming glimpse ASTs.
+ *
+ * This only can happen on client side. After handling the glimpse AST
+ * we also consider dropping the lock here if it is unused locally for a
+ * long time.
+ */
static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct ldlm_request *dlm_req,
}
#ifdef __KERNEL__
-static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
+static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
+ ldlm_cancel_flags_t cancel_flags)
{
- struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- ENTRY;
+ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
+ ENTRY;
- cfs_spin_lock(&blp->blp_lock);
- if (blwi->blwi_lock && blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
- /* add LDLM_FL_DISCARD_DATA requests to the priority list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
- } else {
- /* other blocking callbacks are added to the regular list */
- cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
- }
- cfs_spin_unlock(&blp->blp_lock);
+ spin_lock(&blp->blp_lock);
+ if (blwi->blwi_lock &&
+ blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
+ /* add LDLM_FL_DISCARD_DATA requests to the priority list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+ } else {
+ /* other blocking callbacks are added to the regular list */
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+ }
+ spin_unlock(&blp->blp_lock);
- cfs_waitq_signal(&blp->blp_waitq);
+ cfs_waitq_signal(&blp->blp_waitq);
- /* can not use blwi->blwi_mode as blwi could be already freed in
- LDLM_ASYNC mode */
- if (mode == LDLM_SYNC)
- cfs_wait_for_completion(&blwi->blwi_comp);
+ /* can not check blwi->blwi_flags as blwi could be already freed in
+ LCF_ASYNC mode */
+ if (!(cancel_flags & LCF_ASYNC))
+ wait_for_completion(&blwi->blwi_comp);
- RETURN(0);
+ RETURN(0);
}
static inline void init_blwi(struct ldlm_bl_work_item *blwi,
- struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count,
- struct ldlm_lock *lock,
- int mode)
+ struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld,
+ cfs_list_t *cancels, int count,
+ struct ldlm_lock *lock,
+ ldlm_cancel_flags_t cancel_flags)
{
- cfs_init_completion(&blwi->blwi_comp);
+ init_completion(&blwi->blwi_comp);
CFS_INIT_LIST_HEAD(&blwi->blwi_head);
if (cfs_memory_pressure_get())
blwi->blwi_mem_pressure = 1;
blwi->blwi_ns = ns;
- blwi->blwi_mode = mode;
+ blwi->blwi_flags = cancel_flags;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
}
}
+/**
+ * Queues a list of locks \a cancels containing \a count locks
+ * for later processing by a blocking thread. If \a count is zero,
+ * then the lock referenced as \a lock is queued instead.
+ *
+ * The blocking thread would then call ->l_blocking_ast callback in the lock.
+ * If list addition fails an error is returned and caller is supposed to
+ * call ->l_blocking_ast itself.
+ */
static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- cfs_list_t *cancels, int count, int mode)
+ struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock,
+ cfs_list_t *cancels, int count,
+ ldlm_cancel_flags_t cancel_flags)
{
- ENTRY;
+ ENTRY;
- if (cancels && count == 0)
- RETURN(0);
+ if (cancels && count == 0)
+ RETURN(0);
- if (mode == LDLM_SYNC) {
- /* if it is synchronous call do minimum mem alloc, as it could
- * be triggered from kernel shrinker
- */
- struct ldlm_bl_work_item blwi;
- memset(&blwi, 0, sizeof(blwi));
- init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
- RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
- } else {
- struct ldlm_bl_work_item *blwi;
- OBD_ALLOC(blwi, sizeof(*blwi));
- if (blwi == NULL)
- RETURN(-ENOMEM);
- init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
+ if (cancel_flags & LCF_ASYNC) {
+ struct ldlm_bl_work_item *blwi;
- RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
- }
+ OBD_ALLOC(blwi, sizeof(*blwi));
+ if (blwi == NULL)
+ RETURN(-ENOMEM);
+ init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
+
+ RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
+ } else {
+ /* if it is synchronous call do minimum mem alloc, as it could
+ * be triggered from kernel shrinker
+ */
+ struct ldlm_bl_work_item blwi;
+
+ memset(&blwi, 0, sizeof(blwi));
+ init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
+ RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
+ }
}
#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct ldlm_lock *lock)
+ struct ldlm_lock *lock)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
+ return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
#else
- RETURN(-ENOSYS);
+ return -ENOSYS;
#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- cfs_list_t *cancels, int count, int mode)
+ cfs_list_t *cancels, int count,
+ ldlm_cancel_flags_t cancel_flags)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
+ return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
#else
- RETURN(-ENOSYS);
+ return -ENOSYS;
#endif
}
CWARN("Send reply failed, maybe cause bug 21636.\n");
}
+static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
+{
+ struct obd_quotactl *oqctl;
+ struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
+
+ oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
+ if (oqctl == NULL) {
+ CERROR("Can't unpack obd_quotactl\n");
+ RETURN(-EPROTO);
+ }
+
+ cli->cl_qchk_stat = oqctl->qc_stat;
+ return 0;
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
LASSERT(req->rq_export != NULL);
LASSERT(req->rq_export->exp_obd != NULL);
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
- case LDLM_BL_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK))
- RETURN(0);
- break;
- case LDLM_CP_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK))
- RETURN(0);
- break;
- case LDLM_GL_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK))
- RETURN(0);
- break;
+ switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+ case LDLM_BL_CALLBACK:
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+ RETURN(0);
+ break;
+ case LDLM_CP_CALLBACK:
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
+ RETURN(0);
+ break;
+ case LDLM_GL_CALLBACK:
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
+ RETURN(0);
+ break;
case LDLM_SET_INFO:
rc = ldlm_handle_setinfo(req);
ldlm_callback_reply(req, rc);
RETURN(0);
ldlm_callback_reply(req, rc);
RETURN(0);
- case OBD_QC_CALLBACK:
- req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
- if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
- RETURN(0);
- rc = target_handle_qc_callback(req);
- ldlm_callback_reply(req, rc);
- RETURN(0);
- case QUOTA_DQACQ:
- case QUOTA_DQREL:
- /* reply in handler */
- req_capsule_set(&req->rq_pill, &RQF_MDS_QUOTA_DQACQ);
- rc = target_handle_dqacq_callback(req);
- RETURN(0);
case LLOG_ORIGIN_HANDLE_CREATE:
req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
rc = llog_origin_handle_close(req);
ldlm_callback_reply(req, rc);
RETURN(0);
+ case OBD_QC_CALLBACK:
+ req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
+ RETURN(0);
+ rc = ldlm_handle_qc_callback(req);
+ ldlm_callback_reply(req, rc);
+ RETURN(0);
default:
CERROR("unknown opcode %u\n",
lustre_msg_get_opc(req->rq_reqmsg));
* which the server has already started a blocking callback on. */
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- rc = ldlm_cli_cancel(&dlm_req->lock_handle[0]);
+ rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
if (rc < 0)
CERROR("ldlm_cli_cancel: %d\n", rc);
}
/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
lock_res_and_lock(lock);
- lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
+ lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
+ LDLM_AST_FLAGS);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
/* If somebody cancels lock and cache is already dropped,
* or lock is failed before cp_ast received on client,
&dlm_req->lock_handle[0]);
RETURN(0);
}
- /* BL_AST locks are not needed in lru.
- * let ldlm_cancel_lru() be fast. */
+ /* BL_AST locks are not needed in LRU.
+ * Let ldlm_cancel_lru() be fast. */
ldlm_lock_remove_from_lru(lock);
lock->l_flags |= LDLM_FL_BL_AST;
}
}
#ifdef HAVE_SERVER_SUPPORT
+/**
+ * Main handler for canceld thread.
+ *
+ * Separated into its own thread to avoid deadlocks.
+ */
static int ldlm_cancel_handler(struct ptlrpc_request *req)
{
int rc;
case LDLM_CANCEL:
req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
CDEBUG(D_INODE, "cancel\n");
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
- RETURN(0);
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
+ CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))
+ RETURN(0);
rc = ldlm_handle_cancel(req);
if (rc)
break;
if (lock->l_export && lock->l_export->exp_lock_hash) {
/* NB: it's safe to call cfs_hash_del() even lock isn't
* in exp_lock_hash. */
+ /* In the function below, .hs_keycmp resolves to
+ * ldlm_export_lock_keycmp() */
+ /* coverity[overrun-buffer-val] */
cfs_hash_del(lock->l_export->exp_lock_hash,
&lock->l_remote_handle, &lock->l_exp_hash);
}
EXIT;
}
+EXPORT_SYMBOL(ldlm_revoke_export_locks);
#endif /* HAVE_SERVER_SUPPORT */
#ifdef __KERNEL__
static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
{
- struct ldlm_bl_work_item *blwi = NULL;
- static unsigned int num_bl = 0;
+ struct ldlm_bl_work_item *blwi = NULL;
+ static unsigned int num_bl = 0;
- cfs_spin_lock(&blp->blp_lock);
+ spin_lock(&blp->blp_lock);
/* process a request from the blp_list at least every blp_num_threads */
if (!cfs_list_empty(&blp->blp_list) &&
(cfs_list_empty(&blp->blp_prio_list) || num_bl == 0))
num_bl = 0;
cfs_list_del(&blwi->blwi_entry);
}
- cfs_spin_unlock(&blp->blp_lock);
+ spin_unlock(&blp->blp_lock);
- return blwi;
+ return blwi;
}
/* This only contains temporary data until the thread starts */
struct ldlm_bl_thread_data {
- char bltd_name[CFS_CURPROC_COMM_MAX];
- struct ldlm_bl_pool *bltd_blp;
- cfs_completion_t bltd_comp;
- int bltd_num;
+ char bltd_name[CFS_CURPROC_COMM_MAX];
+ struct ldlm_bl_pool *bltd_blp;
+ struct completion bltd_comp;
+ int bltd_num;
};
static int ldlm_bl_thread_main(void *arg);
static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
{
- struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
- int rc;
+ struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
+ int rc;
- cfs_init_completion(&bltd.bltd_comp);
- rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
- if (rc < 0) {
- CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
- cfs_atomic_read(&blp->blp_num_threads), rc);
- return rc;
- }
- cfs_wait_for_completion(&bltd.bltd_comp);
+ init_completion(&bltd.bltd_comp);
+ rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
+ if (rc < 0) {
+ CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
+ cfs_atomic_read(&blp->blp_num_threads), rc);
+ return rc;
+ }
+ wait_for_completion(&bltd.bltd_comp);
- return 0;
+ return 0;
}
+/**
+ * Main blocking requests processing thread.
+ *
+ * Callers put locks into its queue by calling ldlm_bl_to_thread.
+ * This thread in the end ends up doing actual call to ->l_blocking_ast
+ * for queued locks.
+ */
static int ldlm_bl_thread_main(void *arg)
{
struct ldlm_bl_pool *blp;
"ldlm_bl_%02d", bltd->bltd_num);
cfs_daemonize(bltd->bltd_name);
- cfs_complete(&bltd->bltd_comp);
+ complete(&bltd->bltd_comp);
/* cannot use bltd after this, it is only on caller's stack */
}
if (blwi->blwi_count) {
int count;
- /* The special case when we cancel locks in lru
+ /* The special case when we cancel locks in LRU
* asynchronously, we pass the list of locks here.
* Thus locks are marked LDLM_FL_CANCELING, but NOT
* canceled locally yet. */
count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
blwi->blwi_count,
LCF_BL_AST);
- ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+ blwi->blwi_flags);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
if (blwi->blwi_mem_pressure)
cfs_memory_pressure_clr();
- if (blwi->blwi_mode == LDLM_ASYNC)
- OBD_FREE(blwi, sizeof(*blwi));
- else
- cfs_complete(&blwi->blwi_comp);
+ if (blwi->blwi_flags & LCF_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ complete(&blwi->blwi_comp);
}
cfs_atomic_dec(&blp->blp_busy_threads);
cfs_atomic_dec(&blp->blp_num_threads);
- cfs_complete(&blp->blp_comp);
+ complete(&blp->blp_comp);
RETURN(0);
}
{
int rc = 0;
ENTRY;
- cfs_mutex_lock(&ldlm_ref_mutex);
+ mutex_lock(&ldlm_ref_mutex);
if (++ldlm_refcount == 1) {
rc = ldlm_setup();
if (rc)
ldlm_refcount--;
}
- cfs_mutex_unlock(&ldlm_ref_mutex);
+ mutex_unlock(&ldlm_ref_mutex);
RETURN(rc);
}
+EXPORT_SYMBOL(ldlm_get_ref);
void ldlm_put_ref(void)
{
ENTRY;
- cfs_mutex_lock(&ldlm_ref_mutex);
+ mutex_lock(&ldlm_ref_mutex);
if (ldlm_refcount == 1) {
int rc = ldlm_cleanup();
if (rc)
} else {
ldlm_refcount--;
}
- cfs_mutex_unlock(&ldlm_ref_mutex);
+ mutex_unlock(&ldlm_ref_mutex);
EXIT;
}
+EXPORT_SYMBOL(ldlm_put_ref);
/*
* Export handle<->lock hash operations.
.psc_name = "ldlm_cbd",
.psc_watchdog_factor = 2,
.psc_buf = {
- .bc_nbufs = LDLM_NBUFS,
+ .bc_nbufs = LDLM_CLIENT_NBUFS,
.bc_buf_size = LDLM_BUFSIZE,
.bc_req_max_size = LDLM_MAXREQSIZE,
.bc_rep_max_size = LDLM_MAXREPSIZE,
.psc_name = "ldlm_canceld",
.psc_watchdog_factor = 6,
.psc_buf = {
- .bc_nbufs = LDLM_NBUFS,
+ .bc_nbufs = LDLM_SERVER_NBUFS,
.bc_buf_size = LDLM_BUFSIZE,
.bc_req_max_size = LDLM_MAXREQSIZE,
.bc_rep_max_size = LDLM_MAXREPSIZE,
OBD_ALLOC(blp, sizeof(*blp));
if (blp == NULL)
GOTO(out, rc = -ENOMEM);
- ldlm_state->ldlm_bl_pool = blp;
+ ldlm_state->ldlm_bl_pool = blp;
- cfs_spin_lock_init(&blp->blp_lock);
+ spin_lock_init(&blp->blp_lock);
CFS_INIT_LIST_HEAD(&blp->blp_list);
CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
cfs_waitq_init(&blp->blp_waitq);
cfs_waitq_init(&expired_lock_thread.elt_waitq);
CFS_INIT_LIST_HEAD(&waiting_locks_list);
- cfs_spin_lock_init(&waiting_locks_spinlock);
+ spin_lock_init(&waiting_locks_spinlock);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
- cfs_init_completion(&blp->blp_comp);
+ init_completion(&blp->blp_comp);
- cfs_spin_lock(&blp->blp_lock);
+ spin_lock(&blp->blp_lock);
cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list);
cfs_waitq_signal(&blp->blp_waitq);
- cfs_spin_unlock(&blp->blp_lock);
+ spin_unlock(&blp->blp_lock);
- cfs_wait_for_completion(&blp->blp_comp);
+ wait_for_completion(&blp->blp_comp);
}
OBD_FREE(blp, sizeof(*blp));
int ldlm_init(void)
{
- cfs_mutex_init(&ldlm_ref_mutex);
- cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
- cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
+ mutex_init(&ldlm_ref_mutex);
+ mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
+ mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
sizeof(struct ldlm_resource), 0,
CFS_SLAB_HWCACHE_ALIGN);
if (ldlm_resource_slab == NULL)
return -ENOMEM;
- ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
- sizeof(struct ldlm_lock), 0,
- CFS_SLAB_HWCACHE_ALIGN | CFS_SLAB_DESTROY_BY_RCU);
- if (ldlm_lock_slab == NULL) {
- cfs_mem_cache_destroy(ldlm_resource_slab);
- return -ENOMEM;
- }
+ ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
+ sizeof(struct ldlm_lock), 0,
+ CFS_SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
+ if (ldlm_lock_slab == NULL) {
+ cfs_mem_cache_destroy(ldlm_resource_slab);
+ return -ENOMEM;
+ }
ldlm_interval_slab = cfs_mem_cache_create("interval_node",
sizeof(struct ldlm_interval),
rc = cfs_mem_cache_destroy(ldlm_interval_slab);
LASSERTF(rc == 0, "couldn't free interval node slab\n");
}
-
-/* ldlm_extent.c */
-EXPORT_SYMBOL(ldlm_extent_shift_kms);
-
-/* ldlm_lock.c */
-#ifdef HAVE_SERVER_SUPPORT
-EXPORT_SYMBOL(ldlm_get_processing_policy);
-#endif
-EXPORT_SYMBOL(ldlm_lock2desc);
-EXPORT_SYMBOL(ldlm_register_intent);
-EXPORT_SYMBOL(ldlm_lockname);
-EXPORT_SYMBOL(ldlm_typename);
-EXPORT_SYMBOL(ldlm_lock2handle);
-EXPORT_SYMBOL(__ldlm_handle2lock);
-EXPORT_SYMBOL(ldlm_lock_get);
-EXPORT_SYMBOL(ldlm_lock_put);
-EXPORT_SYMBOL(ldlm_lock_match);
-EXPORT_SYMBOL(ldlm_lock_cancel);
-EXPORT_SYMBOL(ldlm_lock_addref);
-EXPORT_SYMBOL(ldlm_lock_addref_try);
-EXPORT_SYMBOL(ldlm_lock_decref);
-EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
-EXPORT_SYMBOL(ldlm_lock_change_resource);
-EXPORT_SYMBOL(ldlm_it2str);
-EXPORT_SYMBOL(ldlm_lock_dump_handle);
-EXPORT_SYMBOL(ldlm_reprocess_all_ns);
-EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
-EXPORT_SYMBOL(ldlm_lock_allow_match);
-EXPORT_SYMBOL(ldlm_lock_downgrade);
-EXPORT_SYMBOL(ldlm_lock_convert);
-
-/* ldlm_request.c */
-EXPORT_SYMBOL(ldlm_completion_ast_async);
-EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
-EXPORT_SYMBOL(ldlm_completion_ast);
-EXPORT_SYMBOL(ldlm_blocking_ast);
-EXPORT_SYMBOL(ldlm_glimpse_ast);
-EXPORT_SYMBOL(ldlm_expired_completion_wait);
-EXPORT_SYMBOL(ldlm_prep_enqueue_req);
-EXPORT_SYMBOL(ldlm_prep_elc_req);
-EXPORT_SYMBOL(ldlm_cli_convert);
-EXPORT_SYMBOL(ldlm_cli_enqueue);
-EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
-EXPORT_SYMBOL(ldlm_cli_enqueue_local);
-EXPORT_SYMBOL(ldlm_cli_cancel);
-EXPORT_SYMBOL(ldlm_cli_cancel_unused);
-EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
-EXPORT_SYMBOL(ldlm_cli_cancel_req);
-EXPORT_SYMBOL(ldlm_replay_locks);
-EXPORT_SYMBOL(ldlm_resource_foreach);
-EXPORT_SYMBOL(ldlm_namespace_foreach);
-EXPORT_SYMBOL(ldlm_resource_iterate);
-EXPORT_SYMBOL(ldlm_cancel_resource_local);
-EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
-EXPORT_SYMBOL(ldlm_cli_cancel_list);
-
-/* ldlm_lockd.c */
-#ifdef HAVE_SERVER_SUPPORT
-EXPORT_SYMBOL(ldlm_server_blocking_ast);
-EXPORT_SYMBOL(ldlm_server_completion_ast);
-EXPORT_SYMBOL(ldlm_server_glimpse_ast);
-EXPORT_SYMBOL(ldlm_handle_enqueue);
-EXPORT_SYMBOL(ldlm_handle_enqueue0);
-EXPORT_SYMBOL(ldlm_handle_cancel);
-EXPORT_SYMBOL(ldlm_request_cancel);
-EXPORT_SYMBOL(ldlm_handle_convert);
-EXPORT_SYMBOL(ldlm_handle_convert0);
-EXPORT_SYMBOL(ldlm_revoke_export_locks);
-#endif
-EXPORT_SYMBOL(ldlm_del_waiting_lock);
-EXPORT_SYMBOL(ldlm_get_ref);
-EXPORT_SYMBOL(ldlm_put_ref);
-EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
-
-/* ldlm_resource.c */
-EXPORT_SYMBOL(ldlm_namespace_new);
-EXPORT_SYMBOL(ldlm_namespace_cleanup);
-EXPORT_SYMBOL(ldlm_namespace_free);
-EXPORT_SYMBOL(ldlm_namespace_dump);
-EXPORT_SYMBOL(ldlm_dump_all_namespaces);
-EXPORT_SYMBOL(ldlm_resource_get);
-EXPORT_SYMBOL(ldlm_resource_putref);
-EXPORT_SYMBOL(ldlm_resource_unlink_lock);
-
-/* ldlm_lib.c */
-EXPORT_SYMBOL(client_import_add_conn);
-EXPORT_SYMBOL(client_import_del_conn);
-EXPORT_SYMBOL(client_obd_setup);
-EXPORT_SYMBOL(client_obd_cleanup);
-EXPORT_SYMBOL(client_connect_import);
-EXPORT_SYMBOL(client_disconnect_export);
-EXPORT_SYMBOL(target_send_reply);
-EXPORT_SYMBOL(target_pack_pool_reply);
-
-#ifdef HAVE_SERVER_SUPPORT
-EXPORT_SYMBOL(server_disconnect_export);
-EXPORT_SYMBOL(target_stop_recovery_thread);
-EXPORT_SYMBOL(target_handle_connect);
-EXPORT_SYMBOL(target_cleanup_recovery);
-EXPORT_SYMBOL(target_destroy_export);
-EXPORT_SYMBOL(target_cancel_recovery_timer);
-EXPORT_SYMBOL(target_queue_recovery_request);
-EXPORT_SYMBOL(target_handle_ping);
-EXPORT_SYMBOL(target_handle_disconnect);
-#endif
-
-/* l_lock.c */
-EXPORT_SYMBOL(lock_res_and_lock);
-EXPORT_SYMBOL(unlock_res_and_lock);