extern cfs_mem_cache_t *ldlm_resource_slab;
extern cfs_mem_cache_t *ldlm_lock_slab;
-static struct semaphore ldlm_ref_sem;
+static cfs_semaphore_t ldlm_ref_sem;
static int ldlm_refcount;
/* LDLM state */
#ifdef __KERNEL__
/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
-static spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
-static struct list_head waiting_locks_list;
+static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
+static cfs_list_t waiting_locks_list;
static cfs_timer_t waiting_locks_timer;
static struct expired_lock_thread {
cfs_waitq_t elt_waitq;
int elt_state;
int elt_dump;
- struct list_head elt_expired_locks;
+ cfs_list_t elt_expired_locks;
} expired_lock_thread;
#endif
#define ELT_TERMINATE 2
struct ldlm_bl_pool {
- spinlock_t blp_lock;
+ cfs_spinlock_t blp_lock;
/*
* blp_prio_list is used for callbacks that should be handled
* as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
* see bug 13843
*/
- struct list_head blp_prio_list;
+ cfs_list_t blp_prio_list;
/*
* blp_list is used for all other callbacks which are likely
* to take longer to process.
*/
- struct list_head blp_list;
+ cfs_list_t blp_list;
cfs_waitq_t blp_waitq;
- struct completion blp_comp;
- atomic_t blp_num_threads;
- atomic_t blp_busy_threads;
+ cfs_completion_t blp_comp;
+ cfs_atomic_t blp_num_threads;
+ cfs_atomic_t blp_busy_threads;
int blp_min_threads;
int blp_max_threads;
};
struct ldlm_bl_work_item {
- struct list_head blwi_entry;
- struct ldlm_namespace *blwi_ns;
+ cfs_list_t blwi_entry;
+ struct ldlm_namespace *blwi_ns;
struct ldlm_lock_desc blwi_ld;
- struct ldlm_lock *blwi_lock;
- struct list_head blwi_head;
+ struct ldlm_lock *blwi_lock;
+ cfs_list_t blwi_head;
int blwi_count;
+ cfs_completion_t blwi_comp;
+ cfs_atomic_t blwi_ref_count;
};
#ifdef __KERNEL__
+static inline void ldlm_bl_work_item_get(struct ldlm_bl_work_item *blwi)
+{
+ cfs_atomic_inc(&blwi->blwi_ref_count);
+}
+
+static inline void ldlm_bl_work_item_put(struct ldlm_bl_work_item *blwi)
+{
+ if (cfs_atomic_dec_and_test(&blwi->blwi_ref_count))
+ OBD_FREE(blwi, sizeof(*blwi));
+}
static inline int have_expired_locks(void)
{
int need_to_run;
ENTRY;
- spin_lock_bh(&waiting_locks_spinlock);
- need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
+ need_to_run = !cfs_list_empty(&expired_lock_thread.elt_expired_locks);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
RETURN(need_to_run);
}
static int expired_lock_main(void *arg)
{
- struct list_head *expired = &expired_lock_thread.elt_expired_locks;
+ cfs_list_t *expired = &expired_lock_thread.elt_expired_locks;
struct l_wait_info lwi = { 0 };
int do_dump;
expired_lock_thread.elt_state == ELT_TERMINATE,
&lwi);
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (expired_lock_thread.elt_dump) {
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
/* from waiting_locks_callback, but not in timer */
libcfs_debug_dumplog();
"waiting_locks_callback",
expired_lock_thread.elt_dump);
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
expired_lock_thread.elt_dump = 0;
}
do_dump = 0;
- while (!list_empty(expired)) {
+ while (!cfs_list_empty(expired)) {
struct obd_export *export;
struct ldlm_lock *lock;
- lock = list_entry(expired->next, struct ldlm_lock,
+ lock = cfs_list_entry(expired->next, struct ldlm_lock,
l_pending_chain);
if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
(void *)lock >= LP_POISON) {
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
CERROR("free lock on elt list %p\n", lock);
LBUG();
}
- list_del_init(&lock->l_pending_chain);
+ cfs_list_del_init(&lock->l_pending_chain);
if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
(void *)lock->l_export >= LP_POISON) {
CERROR("lock with free export on elt list %p\n",
LDLM_LOCK_RELEASE(lock);
continue;
}
- export = class_export_lock_get(lock->l_export);
- spin_unlock_bh(&waiting_locks_spinlock);
+ export = class_export_lock_get(lock->l_export, lock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
+
+ do_dump++;
+ class_fail_export(export);
+ class_export_lock_put(export, lock);
/* release extra ref grabbed by ldlm_add_waiting_lock()
* or ldlm_failed_ast() */
LDLM_LOCK_RELEASE(lock);
- do_dump++;
- class_fail_export(export);
- class_export_lock_put(export);
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
}
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
if (do_dump && obd_dump_on_eviction) {
CERROR("dump the log upon eviction\n");
if (lock->l_export == NULL)
return 0;
- spin_lock(&lock->l_export->exp_lock);
- list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ cfs_spin_lock(&lock->l_export->exp_lock);
+ cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+ rq_exp_list) {
if (req->rq_ops->hpreq_lock_match) {
match = req->rq_ops->hpreq_lock_match(req, lock);
if (match)
break;
}
}
- spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock(&lock->l_export->exp_lock);
RETURN(match);
}
struct ldlm_lock *lock, *last = NULL;
repeat:
- spin_lock_bh(&waiting_locks_spinlock);
- while (!list_empty(&waiting_locks_list)) {
- lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
- l_pending_chain);
- if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) ||
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
+ while (!cfs_list_empty(&waiting_locks_list)) {
+ lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
+ l_pending_chain);
+ if (cfs_time_after(lock->l_callback_timeout,
+ cfs_time_current()) ||
(lock->l_req_mode == LCK_GROUP))
break;
lock->l_export->exp_connection->c_remote_uuid.uuid,
libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
- list_del_init(&lock->l_pending_chain);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_list_del_init(&lock->l_pending_chain);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
ldlm_add_waiting_lock(lock);
goto repeat;
}
lock->l_export->exp_connection->c_remote_uuid.uuid,
libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
- list_del_init(&lock->l_pending_chain);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_list_del_init(&lock->l_pending_chain);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
ldlm_add_waiting_lock(lock);
goto repeat;
}
LDLM_LOCK_GET(lock);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "prolong the busy lock");
ldlm_refresh_waiting_lock(lock,
ldlm_get_enq_timeout(lock));
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (!cont) {
LDLM_LOCK_RELEASE(lock);
/* no needs to take an extra ref on the lock since it was in
* the waiting_locks_list and ldlm_add_waiting_lock()
* already grabbed a ref */
- list_del(&lock->l_pending_chain);
- list_add(&lock->l_pending_chain,
- &expired_lock_thread.elt_expired_locks);
+ cfs_list_del(&lock->l_pending_chain);
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
}
- if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
+ if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
if (obd_dump_on_timeout)
expired_lock_thread.elt_dump = __LINE__;
* Make sure the timer will fire again if we have any locks
* left.
*/
- if (!list_empty(&waiting_locks_list)) {
+ if (!cfs_list_empty(&waiting_locks_list)) {
cfs_time_t timeout_rounded;
- lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
- l_pending_chain);
+ lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
+ l_pending_chain);
timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
}
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
}
/*
cfs_time_t timeout;
cfs_time_t timeout_rounded;
- if (!list_empty(&lock->l_pending_chain))
+ if (!cfs_list_empty(&lock->l_pending_chain))
return 0;
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
}
/* if the new lock has a shorter timeout than something earlier on
the list, we'll wait the longer amount of time; no big deal. */
- list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
+ /* FIFO */
+ cfs_list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
return 1;
}
LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (lock->l_destroyed) {
static cfs_time_t next;
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
if (cfs_time_after(cfs_time_current(), next)) {
next = cfs_time_shift(14400);
/* grab ref on the lock if it has been added to the
* waiting list */
LDLM_LOCK_GET(lock);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
*/
static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
{
- struct list_head *list_next;
+ cfs_list_t *list_next;
- if (list_empty(&lock->l_pending_chain))
+ if (cfs_list_empty(&lock->l_pending_chain))
return 0;
list_next = lock->l_pending_chain.next;
cfs_timer_disarm(&waiting_locks_timer);
} else {
struct ldlm_lock *next;
- next = list_entry(list_next, struct ldlm_lock,
- l_pending_chain);
+ next = cfs_list_entry(list_next, struct ldlm_lock,
+ l_pending_chain);
cfs_timer_arm(&waiting_locks_timer,
round_timeout(next->l_callback_timeout));
}
}
- list_del_init(&lock->l_pending_chain);
+ cfs_list_del_init(&lock->l_pending_chain);
return 1;
}
if (lock->l_export == NULL) {
/* We don't have a "waiting locks list" on clients. */
- LDLM_DEBUG(lock, "client lock: no-op");
+ CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
return 0;
}
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
ret = __ldlm_del_waiting_lock(lock);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
if (ret)
/* release lock ref if it has indeed been removed
* from a list */
return 0;
}
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
- if (list_empty(&lock->l_pending_chain)) {
- spin_unlock_bh(&waiting_locks_spinlock);
+ if (cfs_list_empty(&lock->l_pending_chain)) {
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "wasn't waiting");
return 0;
}
* release/take a lock reference */
__ldlm_del_waiting_lock(lock);
__ldlm_add_waiting_lock(lock, timeout);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "refreshed");
return 1;
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
#ifdef __KERNEL__
- spin_lock_bh(&waiting_locks_spinlock);
+ cfs_spin_lock_bh(&waiting_locks_spinlock);
if (__ldlm_del_waiting_lock(lock) == 0)
/* the lock was not in any list, grab an extra ref before adding
* the lock to the expired list */
LDLM_LOCK_GET(lock);
- list_add(&lock->l_pending_chain, &expired_lock_thread.elt_expired_locks);
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- spin_unlock_bh(&waiting_locks_spinlock);
+ cfs_spin_unlock_bh(&waiting_locks_spinlock);
#else
class_fail_export(lock->l_export);
#endif
* been received yet, we need to update lvbo to have the
* proper attributes cached. */
if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
- ldlm_res_lvbo_update(lock->l_resource, NULL,
- 0, 1);
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
rc = ldlm_handle_ast_error(lock, req, rc,
arg->type == LDLM_BL_CALLBACK
? "blocking" : "completion");
LDLM_LOCK_RELEASE(lock);
if (rc == -ERESTART)
- atomic_set(&arg->restart, 1);
+ cfs_atomic_set(&arg->restart, 1);
RETURN(0);
}
if (rc == 0)
/* If we cancelled the lock, we need to restart
* ldlm_reprocess_queue */
- atomic_set(&arg->restart, 1);
+ cfs_atomic_set(&arg->restart, 1);
} else {
LDLM_LOCK_GET(lock);
ptlrpc_set_add_req(arg->set, req);
RETURN_EXIT;
}
- spin_lock(&lock->l_export->exp_lock);
- list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ cfs_spin_lock(&lock->l_export->exp_lock);
+ cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+ rq_exp_list) {
if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
req->rq_ops->hpreq_lock_match(req, lock))
ptlrpc_hpreq_reorder(req);
}
- spin_unlock(&lock->l_export->exp_lock);
+ cfs_spin_unlock(&lock->l_export->exp_lock);
EXIT;
}
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
- at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
- total_enqueue_wait);
+ at_measured(&lock->l_resource->lr_namespace->ns_at_estimate,
+ total_enqueue_wait);
else
/* bz18618. Don't add lock enqueue time we spend waiting for a
previous callback to fail. Locks waiting legitimately will
else if (rc != 0)
rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
else
- rc = ldlm_res_lvbo_update(res, req, REPLY_REC_OFF, 1);
+ rc = ldlm_res_lvbo_update(res, req, 1);
ptlrpc_req_finished(req);
if (rc == -ERESTART)
if (unlikely(flags & LDLM_FL_REPLAY)) {
/* Find an existing lock in the per-export lock hash */
- lock = lustre_hash_lookup(req->rq_export->exp_lock_hash,
- (void *)&dlm_req->lock_handle[0]);
+ lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
+ (void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
LPX64, lock->l_handle.h_cookie);
LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
GOTO(out, rc = -ENOTCONN);
}
- lock->l_export = class_export_lock_get(req->rq_export);
+
+ lock->l_export = class_export_lock_get(req->rq_export, lock);
if (lock->l_export->exp_lock_hash)
- lustre_hash_add(lock->l_export->exp_lock_hash,
- &lock->l_remote_handle,
- &lock->l_exp_hash);
+ cfs_hash_add(lock->l_export->exp_lock_hash,
+ &lock->l_remote_handle,
+ &lock->l_exp_hash);
existing_lock:
if (res != NULL) {
ldlm_resource_getref(res);
LDLM_RESOURCE_ADDREF(res);
- ldlm_res_lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 1);
}
pres = res;
}
int do_ast;
ENTRY;
- LDLM_DEBUG(lock, "client blocking AST callback handler START");
+ LDLM_DEBUG(lock, "client blocking AST callback handler");
lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_CBPENDING;
unlock_res_and_lock(lock);
if (do_ast) {
- LDLM_DEBUG(lock, "already unused, calling "
- "callback (%p)", lock->l_blocking_ast);
+ CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
+ lock, lock->l_blocking_ast);
if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, ld, lock->l_ast_data,
LDLM_CB_BLOCKING);
} else {
- LDLM_DEBUG(lock, "Lock still has references, will be"
- " cancelled later");
+ CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
+ lock);
}
LDLM_DEBUG(lock, "client blocking callback handler END");
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
int to = cfs_time_seconds(1);
while (to > 0) {
- cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, to);
+ cfs_schedule_timeout_and_set_state(
+ CFS_TASK_INTERRUPTIBLE, to);
if (lock->l_granted_mode == lock->l_req_mode ||
lock->l_destroyed)
break;
LDLM_ERROR(lock, "completion AST did not contain "
"expected LVB!");
} else {
- void *lvb = req_capsule_client_swab_get(&req->rq_pill,
- &RMF_DLM_LVB,
- (void *)lock->l_lvb_swabber);
+ void *lvb = req_capsule_client_get(&req->rq_pill,
+ &RMF_DLM_LVB);
memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
}
}
}
#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+static int __ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_bl_work_item *blwi,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- struct list_head *cancels, int count)
+ cfs_list_t *cancels, int count, int mode)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- struct ldlm_bl_work_item *blwi;
ENTRY;
- if (cancels && count == 0)
+ if (cancels && count == 0) {
+ if (mode == LDLM_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
RETURN(0);
+ }
- OBD_ALLOC(blwi, sizeof(*blwi));
- if (blwi == NULL)
- RETURN(-ENOMEM);
+ cfs_init_completion(&blwi->blwi_comp);
+ cfs_atomic_set(&blwi->blwi_ref_count, 1);
blwi->blwi_ns = ns;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
- list_add(&blwi->blwi_head, cancels);
- list_del_init(cancels);
+ cfs_list_add(&blwi->blwi_head, cancels);
+ cfs_list_del_init(cancels);
blwi->blwi_count = count;
} else {
blwi->blwi_lock = lock;
}
- spin_lock(&blp->blp_lock);
+
+ cfs_spin_lock(&blp->blp_lock);
if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
/* add LDLM_FL_DISCARD_DATA requests to the priority list */
- list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
} else {
/* other blocking callbacks are added to the regular list */
- list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+ }
+ cfs_spin_unlock(&blp->blp_lock);
+
+ if (mode == LDLM_SYNC) {
+ /* keep ref count as object is on this stack for SYNC call */
+ ldlm_bl_work_item_get(blwi);
+ cfs_waitq_signal(&blp->blp_waitq);
+ cfs_wait_for_completion(&blwi->blwi_comp);
+ } else {
+ cfs_waitq_signal(&blp->blp_waitq);
}
- cfs_waitq_signal(&blp->blp_waitq);
- spin_unlock(&blp->blp_lock);
RETURN(0);
}
+
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+ struct list_head *cancels, int count, int mode)
+{
+ ENTRY;
+
+ if (mode == LDLM_SYNC) {
+ /* if it is synchronous call do minimum mem alloc, as it could
+ * be triggered from kernel shrinker
+ */
+ struct ldlm_bl_work_item blwi;
+ memset(&blwi, 0, sizeof(blwi));
+ /* have extra ref as this obj is on stack */
+ RETURN(__ldlm_bl_to_thread(ns, &blwi, ld, lock, cancels, count, mode));
+ } else {
+ struct ldlm_bl_work_item *blwi;
+ OBD_ALLOC(blwi, sizeof(*blwi));
+ if (blwi == NULL)
+ RETURN(-ENOMEM);
+
+ RETURN(__ldlm_bl_to_thread(ns, blwi, ld, lock, cancels, count, mode));
+ }
+}
+
#endif
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock *lock)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+ RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
#else
RETURN(-ENOSYS);
#endif
}
int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct list_head *cancels, int count)
+ cfs_list_t *cancels, int count, int mode)
{
#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
+ RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
#else
RETURN(-ENOSYS);
#endif
int rc = -ENOSYS;
ENTRY;
- DEBUG_REQ(D_ERROR, req, "%s: handle setinfo\n", obd->obd_name);
+ DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
return rc;
}
+static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
+ const char *msg, int rc,
+ struct lustre_handle *handle)
+{
+ CWARN("%s: [pid %d] [xid x"LPU64"] [nid %s] [opc %d] [rc %d] "
+ "[lock "LPX64"].\n",
+ msg, lustre_msg_get_status(req->rq_reqmsg),
+ req->rq_xid, libcfs_id2str(req->rq_peer),
+ lustre_msg_get_opc(req->rq_reqmsg), rc,
+ handle ? handle->cookie : 0);
+ if (req->rq_no_reply)
+ CWARN("No reply was sent, maybe cause bug 21636.\n");
+ else if (rc)
+ CWARN("Send reply failed, maybe cause bug 21636.\n");
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- ldlm_callback_reply(req, -ENOTCONN);
+ rc = ldlm_callback_reply(req, -ENOTCONN);
+ ldlm_callback_errmsg(req, "Operate on unconnected server",
+ rc, NULL);
RETURN(0);
}
dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (dlm_req == NULL) {
- ldlm_callback_reply(req, -EPROTO);
+ rc = ldlm_callback_reply(req, -EPROTO);
+ ldlm_callback_errmsg(req, "Operate without parameter", rc,
+ NULL);
RETURN(0);
}
if (!lock) {
CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
"disappeared\n", dlm_req->lock_handle[0].cookie);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
dlm_req->lock_handle[0].cookie);
unlock_res_and_lock(lock);
LDLM_LOCK_RELEASE(lock);
- ldlm_callback_reply(req, -EINVAL);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+ &dlm_req->lock_handle[0]);
RETURN(0);
}
/* BL_AST locks are not needed in lru.
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
- if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
- ldlm_callback_reply(req, 0);
+ if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
+ rc = ldlm_callback_reply(req, 0);
+ if (req->rq_no_reply || rc)
+ ldlm_callback_errmsg(req, "Normal process", rc,
+ &dlm_req->lock_handle[0]);
+ }
if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
libcfs_id2str(req->rq_peer),
lustre_msg_get_handle(req->rq_reqmsg)->cookie);
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL)
- ldlm_lock_dump_handle(D_ERROR,
- &dlm_req->lock_handle[0]);
+ if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+ dlm_req = req_capsule_client_get(&req->rq_pill,
+ &RMF_DLM_REQ);
+ if (dlm_req != NULL)
+ ldlm_lock_dump_handle(D_ERROR,
+ &dlm_req->lock_handle[0]);
+ }
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
void ldlm_revoke_lock_cb(void *obj, void *data)
{
- struct list_head *rpc_list = data;
+ cfs_list_t *rpc_list = data;
struct ldlm_lock *lock = obj;
lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_AST_SENT;
if (lock->l_export && lock->l_export->exp_lock_hash &&
- !hlist_unhashed(&lock->l_exp_hash))
- lustre_hash_del(lock->l_export->exp_lock_hash,
- &lock->l_remote_handle, &lock->l_exp_hash);
- list_add_tail(&lock->l_rk_ast, rpc_list);
+ !cfs_hlist_unhashed(&lock->l_exp_hash))
+ cfs_hash_del(lock->l_export->exp_lock_hash,
+ &lock->l_remote_handle, &lock->l_exp_hash);
+ cfs_list_add_tail(&lock->l_rk_ast, rpc_list);
LDLM_LOCK_GET(lock);
unlock_res_and_lock(lock);
void ldlm_revoke_export_locks(struct obd_export *exp)
{
- struct list_head rpc_list;
+ cfs_list_t rpc_list;
ENTRY;
CFS_INIT_LIST_HEAD(&rpc_list);
- lustre_hash_for_each_empty(exp->exp_lock_hash,
- ldlm_revoke_lock_cb, &rpc_list);
+ cfs_hash_for_each_empty(exp->exp_lock_hash,
+ ldlm_revoke_lock_cb, &rpc_list);
ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
EXIT;
struct ldlm_bl_work_item *blwi = NULL;
static unsigned int num_bl = 0;
- spin_lock(&blp->blp_lock);
+ cfs_spin_lock(&blp->blp_lock);
/* process a request from the blp_list at least every blp_num_threads */
- if (!list_empty(&blp->blp_list) &&
- (list_empty(&blp->blp_prio_list) || num_bl == 0))
- blwi = list_entry(blp->blp_list.next,
- struct ldlm_bl_work_item, blwi_entry);
+ if (!cfs_list_empty(&blp->blp_list) &&
+ (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0))
+ blwi = cfs_list_entry(blp->blp_list.next,
+ struct ldlm_bl_work_item, blwi_entry);
else
- if (!list_empty(&blp->blp_prio_list))
- blwi = list_entry(blp->blp_prio_list.next,
- struct ldlm_bl_work_item, blwi_entry);
+ if (!cfs_list_empty(&blp->blp_prio_list))
+ blwi = cfs_list_entry(blp->blp_prio_list.next,
+ struct ldlm_bl_work_item,
+ blwi_entry);
if (blwi) {
- if (++num_bl >= atomic_read(&blp->blp_num_threads))
+ if (++num_bl >= cfs_atomic_read(&blp->blp_num_threads))
num_bl = 0;
- list_del(&blwi->blwi_entry);
+ cfs_list_del(&blwi->blwi_entry);
}
- spin_unlock(&blp->blp_lock);
+ cfs_spin_unlock(&blp->blp_lock);
return blwi;
}
struct ldlm_bl_thread_data {
char bltd_name[CFS_CURPROC_COMM_MAX];
struct ldlm_bl_pool *bltd_blp;
- struct completion bltd_comp;
+ cfs_completion_t bltd_comp;
int bltd_num;
};
struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
int rc;
- init_completion(&bltd.bltd_comp);
+ cfs_init_completion(&bltd.bltd_comp);
rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
if (rc < 0) {
CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
- atomic_read(&blp->blp_num_threads), rc);
+ cfs_atomic_read(&blp->blp_num_threads), rc);
return rc;
}
- wait_for_completion(&bltd.bltd_comp);
+ cfs_wait_for_completion(&bltd.bltd_comp);
return 0;
}
blp = bltd->bltd_blp;
- bltd->bltd_num = atomic_inc_return(&blp->blp_num_threads) - 1;
- atomic_inc(&blp->blp_busy_threads);
+ bltd->bltd_num =
+ cfs_atomic_inc_return(&blp->blp_num_threads) - 1;
+ cfs_atomic_inc(&blp->blp_busy_threads);
snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
"ldlm_bl_%02d", bltd->bltd_num);
cfs_daemonize(bltd->bltd_name);
- complete(&bltd->bltd_comp);
+ cfs_complete(&bltd->bltd_comp);
/* cannot use bltd after this, it is only on caller's stack */
}
if (blwi == NULL) {
int busy;
- atomic_dec(&blp->blp_busy_threads);
+ cfs_atomic_dec(&blp->blp_busy_threads);
l_wait_event_exclusive(blp->blp_waitq,
(blwi = ldlm_bl_get_work(blp)) != NULL,
&lwi);
- busy = atomic_inc_return(&blp->blp_busy_threads);
+ busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
if (blwi->blwi_ns == NULL)
/* added by ldlm_cleanup() */
/* Not fatal if racy and have a few too many threads */
if (unlikely(busy < blp->blp_max_threads &&
- busy >= atomic_read(&blp->blp_num_threads)))
+ busy >= cfs_atomic_read(&blp->blp_num_threads)))
/* discard the return value, we tried */
ldlm_bl_thread_start(blp);
} else {
if (blwi->blwi_count) {
/* The special case when we cancel locks in lru
* asynchronously, we pass the list of locks here.
- * Thus lock is marked LDLM_FL_CANCELING, and already
- * canceled locally. */
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count, 0);
ldlm_cli_cancel_list(&blwi->blwi_head,
blwi->blwi_count, NULL, 0);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
}
- OBD_FREE(blwi, sizeof(*blwi));
+ cfs_complete(&blwi->blwi_comp);
+ ldlm_bl_work_item_put(blwi);
}
- atomic_dec(&blp->blp_busy_threads);
- atomic_dec(&blp->blp_num_threads);
- complete(&blp->blp_comp);
+ cfs_atomic_dec(&blp->blp_busy_threads);
+ cfs_atomic_dec(&blp->blp_num_threads);
+ cfs_complete(&blp->blp_comp);
RETURN(0);
}
{
int rc = 0;
ENTRY;
- mutex_down(&ldlm_ref_sem);
+ cfs_mutex_down(&ldlm_ref_sem);
if (++ldlm_refcount == 1) {
rc = ldlm_setup();
if (rc)
ldlm_refcount--;
}
- mutex_up(&ldlm_ref_sem);
+ cfs_mutex_up(&ldlm_ref_sem);
RETURN(rc);
}
void ldlm_put_ref(void)
{
ENTRY;
- mutex_down(&ldlm_ref_sem);
+ cfs_mutex_down(&ldlm_ref_sem);
if (ldlm_refcount == 1) {
int rc = ldlm_cleanup();
if (rc)
} else {
ldlm_refcount--;
}
- mutex_up(&ldlm_ref_sem);
+ cfs_mutex_up(&ldlm_ref_sem);
EXIT;
}
* Export handle<->lock hash operations.
*/
static unsigned
-ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask)
+ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask)
{
- return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask);
+ return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
}
static void *
-ldlm_export_lock_key(struct hlist_node *hnode)
+ldlm_export_lock_key(cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
ENTRY;
- lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
RETURN(&lock->l_remote_handle);
}
static int
-ldlm_export_lock_compare(void *key, struct hlist_node *hnode)
+ldlm_export_lock_compare(void *key, cfs_hlist_node_t *hnode)
{
ENTRY;
RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key));
}
static void *
-ldlm_export_lock_get(struct hlist_node *hnode)
+ldlm_export_lock_get(cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
ENTRY;
- lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_GET(lock);
RETURN(lock);
}
static void *
-ldlm_export_lock_put(struct hlist_node *hnode)
+ldlm_export_lock_put(cfs_hlist_node_t *hnode)
{
struct ldlm_lock *lock;
ENTRY;
- lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+ lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
LDLM_LOCK_RELEASE(lock);
RETURN(lock);
}
-static lustre_hash_ops_t ldlm_export_lock_ops = {
- .lh_hash = ldlm_export_lock_hash,
- .lh_key = ldlm_export_lock_key,
- .lh_compare = ldlm_export_lock_compare,
- .lh_get = ldlm_export_lock_get,
- .lh_put = ldlm_export_lock_put
+static cfs_hash_ops_t ldlm_export_lock_ops = {
+ .hs_hash = ldlm_export_lock_hash,
+ .hs_key = ldlm_export_lock_key,
+ .hs_compare = ldlm_export_lock_compare,
+ .hs_get = ldlm_export_lock_get,
+ .hs_put = ldlm_export_lock_put
};
int ldlm_init_export(struct obd_export *exp)
ENTRY;
exp->exp_lock_hash =
- lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
- 7, 16, &ldlm_export_lock_ops, LH_REHASH);
+ cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
+ HASH_EXP_LOCK_CUR_BITS, HASH_EXP_LOCK_MAX_BITS,
+ &ldlm_export_lock_ops, CFS_HASH_REHASH);
if (!exp->exp_lock_hash)
RETURN(-ENOMEM);
void ldlm_destroy_export(struct obd_export *exp)
{
ENTRY;
- lustre_hash_exit(exp->exp_lock_hash);
+ cfs_hash_putref(exp->exp_lock_hash);
exp->exp_lock_hash = NULL;
EXIT;
}
GOTO(out_proc, rc = -ENOMEM);
ldlm_state->ldlm_bl_pool = blp;
- spin_lock_init(&blp->blp_lock);
+ cfs_spin_lock_init(&blp->blp_lock);
CFS_INIT_LIST_HEAD(&blp->blp_list);
CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
cfs_waitq_init(&blp->blp_waitq);
- atomic_set(&blp->blp_num_threads, 0);
- atomic_set(&blp->blp_busy_threads, 0);
+ cfs_atomic_set(&blp->blp_num_threads, 0);
+ cfs_atomic_set(&blp->blp_busy_threads, 0);
blp->blp_min_threads = ldlm_min_threads;
blp->blp_max_threads = ldlm_max_threads;
cfs_waitq_init(&expired_lock_thread.elt_waitq);
CFS_INIT_LIST_HEAD(&waiting_locks_list);
- spin_lock_init(&waiting_locks_spinlock);
+ cfs_spin_lock_init(&waiting_locks_spinlock);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
GOTO(out_thread, rc);
}
- wait_event(expired_lock_thread.elt_waitq,
- expired_lock_thread.elt_state == ELT_READY);
+ cfs_wait_event(expired_lock_thread.elt_waitq,
+ expired_lock_thread.elt_state == ELT_READY);
#endif
#ifdef __KERNEL__
#endif
ENTRY;
- if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
- !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
+ if (!cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
+ !cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
CERROR("ldlm still has namespaces; clean these up first.\n");
ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
#endif
#ifdef __KERNEL__
- while (atomic_read(&blp->blp_num_threads) > 0) {
+ while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
- init_completion(&blp->blp_comp);
+ cfs_init_completion(&blp->blp_comp);
- spin_lock(&blp->blp_lock);
- list_add_tail(&blwi.blwi_entry, &blp->blp_list);
+ cfs_spin_lock(&blp->blp_lock);
+ cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list);
cfs_waitq_signal(&blp->blp_waitq);
- spin_unlock(&blp->blp_lock);
+ cfs_spin_unlock(&blp->blp_lock);
- wait_for_completion(&blp->blp_comp);
+ cfs_wait_for_completion(&blp->blp_comp);
}
OBD_FREE(blp, sizeof(*blp));
expired_lock_thread.elt_state = ELT_TERMINATE;
cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- wait_event(expired_lock_thread.elt_waitq,
- expired_lock_thread.elt_state == ELT_STOPPED);
+ cfs_wait_event(expired_lock_thread.elt_waitq,
+ expired_lock_thread.elt_state == ELT_STOPPED);
#else
ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
int __init ldlm_init(void)
{
- init_mutex(&ldlm_ref_sem);
- init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
- init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
+ cfs_init_mutex(&ldlm_ref_sem);
+ cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
+ cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
sizeof(struct ldlm_resource), 0,
- SLAB_HWCACHE_ALIGN);
+ CFS_SLAB_HWCACHE_ALIGN);
if (ldlm_resource_slab == NULL)
return -ENOMEM;
ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
- sizeof(struct ldlm_lock), 0,
- SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
+ sizeof(struct ldlm_lock), 0,
+ CFS_SLAB_HWCACHE_ALIGN | CFS_SLAB_DESTROY_BY_RCU);
if (ldlm_lock_slab == NULL) {
cfs_mem_cache_destroy(ldlm_resource_slab);
return -ENOMEM;
ldlm_interval_slab = cfs_mem_cache_create("interval_node",
sizeof(struct ldlm_interval),
- 0, SLAB_HWCACHE_ALIGN);
+ 0, CFS_SLAB_HWCACHE_ALIGN);
if (ldlm_interval_slab == NULL) {
cfs_mem_cache_destroy(ldlm_resource_slab);
cfs_mem_cache_destroy(ldlm_lock_slab);
return -ENOMEM;
}
-
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+ class_export_dump_hook = ldlm_dump_export_locks;
+#endif
return 0;
}
EXPORT_SYMBOL(ldlm_it2str);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
-EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
EXPORT_SYMBOL(ldlm_lock_allow_match);
EXPORT_SYMBOL(ldlm_namespace_foreach_res);
EXPORT_SYMBOL(ldlm_resource_iterate);
EXPORT_SYMBOL(ldlm_cancel_resource_local);
+EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
EXPORT_SYMBOL(ldlm_cli_cancel_list);
/* ldlm_lockd.c */
EXPORT_SYMBOL(client_obd_cleanup);
EXPORT_SYMBOL(client_connect_import);
EXPORT_SYMBOL(client_disconnect_export);
+EXPORT_SYMBOL(server_disconnect_export);
EXPORT_SYMBOL(target_stop_recovery_thread);
EXPORT_SYMBOL(target_handle_connect);
EXPORT_SYMBOL(target_cleanup_recovery);