*/
struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
{
- atomic_inc(&lock->l_refc);
+ refcount_inc(&lock->l_handle.h_ref);
return lock;
}
EXPORT_SYMBOL(ldlm_lock_get);
+static void lock_handle_free(struct rcu_head *rcu)
+{
+ struct ldlm_lock *lock = container_of(rcu, struct ldlm_lock,
+ l_handle.h_rcu);
+
+ OBD_FREE_PRE(lock, sizeof(*lock), "slab-freed");
+ kmem_cache_free(ldlm_lock_slab, lock);
+}
+
/**
* Release lock reference.
*
ENTRY;
LASSERT(lock->l_resource != LP_POISON);
- LASSERT(atomic_read(&lock->l_refc) > 0);
- if (atomic_dec_and_test(&lock->l_refc)) {
+ LASSERT(refcount_read(&lock->l_handle.h_ref) > 0);
+ if (refcount_dec_and_test(&lock->l_handle.h_ref)) {
struct ldlm_resource *res;
LDLM_DEBUG(lock,
ldlm_resource_putref(res);
lock->l_resource = NULL;
lu_ref_fini(&lock->l_reference);
- OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
+ call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
}
EXIT;
EXIT;
}
-/* this is called by portals_handle2object with the handle lock taken */
-static void lock_handle_addref(void *lock)
-{
- LDLM_LOCK_GET((struct ldlm_lock *)lock);
-}
-
-static void lock_handle_free(void *lock, int size)
-{
- LASSERT(size == sizeof(struct ldlm_lock));
- OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
-}
-
-static struct portals_handle_ops lock_handle_ops = {
- .hop_addref = lock_handle_addref,
- .hop_free = lock_handle_free,
-};
+static const char lock_handle_owner[] = "ldlm";
/**
*
lock->l_resource = resource;
lu_ref_add(&resource->lr_reference, "lock", lock);
- atomic_set(&lock->l_refc, 2);
+ refcount_set(&lock->l_handle.h_ref, 2);
INIT_LIST_HEAD(&lock->l_res_link);
INIT_LIST_HEAD(&lock->l_lru);
INIT_LIST_HEAD(&lock->l_pending_chain);
lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
LDLM_NSS_LOCKS);
- INIT_LIST_HEAD_RCU(&lock->l_handle.h_link);
- class_handle_hash(&lock->l_handle, &lock_handle_ops);
+ INIT_HLIST_NODE(&lock->l_handle.h_link);
+ class_handle_hash(&lock->l_handle, lock_handle_owner);
lu_ref_init(&lock->l_reference);
lu_ref_add(&lock->l_reference, "hash", lock);
- lock->l_callback_timeout = 0;
+ lock->l_callback_timestamp = 0;
lock->l_activity = 0;
#if LUSTRE_TRACKS_LOCK_EXP_REFS
int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
const struct ldlm_res_id *new_resid)
{
- struct ldlm_resource *oldres = lock->l_resource;
+ struct ldlm_resource *oldres;
struct ldlm_resource *newres;
int type;
ENTRY;
LASSERT(ns_is_client(ns));
- lock_res_and_lock(lock);
- if (memcmp(new_resid, &lock->l_resource->lr_name,
- sizeof(lock->l_resource->lr_name)) == 0) {
+ oldres = lock_res_and_lock(lock);
+ if (memcmp(new_resid, &oldres->lr_name,
+ sizeof(oldres->lr_name)) == 0) {
/* Nothing to do */
unlock_res_and_lock(lock);
RETURN(0);
LASSERT(handle);
- lock = class_handle2object(handle->cookie, &lock_handle_ops);
+ if (!lustre_handle_is_used(handle))
+ RETURN(NULL);
+
+ lock = class_handle2object(handle->cookie, lock_handle_owner);
if (lock == NULL)
RETURN(NULL);
/* It's unlikely but possible that someone marked the lock as
* destroyed after we did handle2object on it */
if ((flags == 0) && !ldlm_is_destroyed(lock)) {
- lu_ref_add(&lock->l_reference, "handle", current);
+ lu_ref_add_atomic(&lock->l_reference, "handle", lock);
RETURN(lock);
}
LASSERT(lock->l_resource != NULL);
- lu_ref_add_atomic(&lock->l_reference, "handle", current);
+ lu_ref_add_atomic(&lock->l_reference, "handle", lock);
if (unlikely(ldlm_is_destroyed(lock))) {
unlock_res_and_lock(lock);
CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
*/
void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
{
- struct ldlm_namespace *ns;
- ENTRY;
+ struct ldlm_namespace *ns;
- lock_res_and_lock(lock);
+ ENTRY;
- ns = ldlm_lock_to_ns(lock);
+ lock_res_and_lock(lock);
+
+ ns = ldlm_lock_to_ns(lock);
- ldlm_lock_decref_internal_nolock(lock, mode);
+ ldlm_lock_decref_internal_nolock(lock, mode);
if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
!lock->l_readers && !lock->l_writers) {
}
if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+ unsigned int mask = D_DLMTRACE;
+
/* If we received a blocked AST and this was the last reference,
* run the callback. */
if (ldlm_is_ns_srv(lock) && lock->l_export)
- CERROR("FL_CBPENDING set on non-local lock--just a "
- "warning\n");
+ mask |= D_WARNING;
+ LDLM_DEBUG_LIMIT(mask, lock,
+ "final decref done on %sCBPENDING lock",
+ mask & D_WARNING ? "non-local " : "");
- LDLM_DEBUG(lock, "final decref done on cbpending lock");
-
- LDLM_LOCK_GET(lock); /* dropped by bl thread */
- ldlm_lock_remove_from_lru(lock);
- unlock_res_and_lock(lock);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
+ ldlm_lock_remove_from_lru(lock);
+ unlock_res_and_lock(lock);
if (ldlm_is_fail_loc(lock))
- OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+ OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
if (ldlm_is_atomic_cb(lock) ||
ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
- ldlm_handle_bl_callback(ns, NULL, lock);
+ ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns_is_client(ns) &&
- !lock->l_readers && !lock->l_writers &&
+ !lock->l_readers && !lock->l_writers &&
!ldlm_is_no_lru(lock) &&
!ldlm_is_bl_ast(lock) &&
!ldlm_is_converting(lock)) {
- LDLM_DEBUG(lock, "add lock into lru list");
-
- /* If this is a client-side namespace and this was the last
- * reference, put it on the LRU. */
- ldlm_lock_add_to_lru(lock);
- unlock_res_and_lock(lock);
+ /* If this is a client-side namespace and this was the last
+ * reference, put it on the LRU.
+ */
+ ldlm_lock_add_to_lru(lock);
+ unlock_res_and_lock(lock);
+ LDLM_DEBUG(lock, "add lock into lru list");
if (ldlm_is_fail_loc(lock))
- OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
-
- /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
- * are not supported by the server, otherwise, it is done on
- * enqueue. */
- if (!exp_connect_cancelset(lock->l_conn_export) &&
- !ns_connect_lru_resize(ns))
- ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
- } else {
- LDLM_DEBUG(lock, "do not add lock into lru list");
- unlock_res_and_lock(lock);
- }
+ OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
- EXIT;
+ ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
+ } else {
+ LDLM_DEBUG(lock, "do not add lock into lru list");
+ unlock_res_and_lock(lock);
+ }
+
+ EXIT;
}
/**
if (ldlm_is_cbpending(lock) &&
!(data->lmd_flags & LDLM_FL_CBPENDING))
return INTERVAL_ITER_CONT;
- if (!data->lmd_unref && ldlm_is_cbpending(lock) &&
+ if (!(data->lmd_match & LDLM_MATCH_UNREF) && ldlm_is_cbpending(lock) &&
lock->l_readers == 0 && lock->l_writers == 0)
return INTERVAL_ITER_CONT;
/* When we search for ast_data, we are not doing a traditional match,
* so we don't worry about IBITS or extent matching.
*/
- if (data->lmd_has_ast_data) {
+ if (data->lmd_match & (LDLM_MATCH_AST | LDLM_MATCH_AST_ANY)) {
if (!lock->l_ast_data)
return INTERVAL_ITER_CONT;
- goto matched;
+ if (data->lmd_match & LDLM_MATCH_AST_ANY)
+ goto matched;
}
match = lock->l_req_mode;
/* We match if we have existing lock with same or wider set
of bits. */
- if (!data->lmd_unref && LDLM_HAVE_MASK(lock, GONE))
+ if (!(data->lmd_match & LDLM_MATCH_UNREF) && LDLM_HAVE_MASK(lock, GONE))
return INTERVAL_ITER_CONT;
if (!equi(data->lmd_flags & LDLM_FL_LOCAL_ONLY, ldlm_is_local(lock)))
enum ldlm_type type,
union ldlm_policy_data *policy,
enum ldlm_mode mode,
- struct lustre_handle *lockh, int unref)
+ struct lustre_handle *lockh,
+ enum ldlm_match_flags match_flags)
{
struct ldlm_match_data data = {
.lmd_old = NULL,
.lmd_policy = policy,
.lmd_flags = flags,
.lmd_skip_flags = skip_flags,
- .lmd_unref = unref,
- .lmd_has_ast_data = false,
+ .lmd_match = match_flags,
};
struct ldlm_resource *res;
struct ldlm_lock *lock;
(!ldlm_is_lvb_ready(lock))) {
__u64 wait_flags = LDLM_FL_LVB_READY |
LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
- struct l_wait_info lwi;
if (lock->l_completion_ast) {
int err = lock->l_completion_ast(lock,
GOTO(out_fail_match, matched = 0);
}
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
- NULL, LWI_ON_SIGNAL_NOOP, NULL);
+ wait_event_idle_timeout(
+ lock->l_waitq,
+ lock->l_flags & wait_flags,
+ cfs_time_seconds(obd_timeout));
- /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
- l_wait_event(lock->l_waitq, lock->l_flags & wait_flags,
- &lwi);
if (!ldlm_is_lvb_ready(lock))
GOTO(out_fail_match, matched = 0);
}
lock->l_req_mode = mode;
lock->l_ast_data = data;
- lock->l_pid = current_pid();
+ lock->l_pid = current->pid;
if (ns_is_server(ns))
ldlm_set_ns_srv(lock);
if (cbs) {
{
struct ldlm_resource *res = lock->l_resource;
enum ldlm_error rc = ELDLM_OK;
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ LIST_HEAD(rpc_list);
ldlm_processing_policy policy;
ENTRY;
void *cookie, __u64 *flags)
{
struct ldlm_lock *lock = *lockp;
- struct ldlm_resource *res = lock->l_resource;
- int local = ns_is_client(ldlm_res_to_ns(res));
+ struct ldlm_resource *res;
+ int local = ns_is_client(ns);
enum ldlm_error rc = ELDLM_OK;
struct ldlm_interval *node = NULL;
+#ifdef HAVE_SERVER_SUPPORT
+ bool reconstruct = false;
+#endif
ENTRY;
/* policies are not executed on the client or during replay */
RETURN(ELDLM_OK);
}
+#ifdef HAVE_SERVER_SUPPORT
/* For a replaying lock, it might be already in granted list. So
* unlinking the lock will cause the interval node to be freed, we
* have to allocate the interval node early otherwise we can't regrant
- * this lock in the future. - jay */
- if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
+ * this lock in the future. - jay
+ *
+ * The only time the ldlm_resource changes for the ldlm_lock is when
+ * ldlm_lock_change_resource() is called and that only happens for
+ * the Lustre client case.
+ */
+ if (!local && (*flags & LDLM_FL_REPLAY) &&
+ lock->l_resource->lr_type == LDLM_EXTENT)
OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
- lock_res_and_lock(lock);
+ reconstruct = !local && lock->l_resource->lr_type == LDLM_FLOCK &&
+ !(*flags & LDLM_FL_TEST_LOCK);
+ if (reconstruct) {
+ rc = req_can_reconstruct(cookie, NULL);
+ if (rc != 0) {
+ if (rc == 1)
+ rc = 0;
+ RETURN(rc);
+ }
+ }
+#endif
+ res = lock_res_and_lock(lock);
if (local && ldlm_is_granted(lock)) {
/* The server returned a blocked lock, but it was granted
* before we got a chance to actually enqueue it. We don't
out:
unlock_res_and_lock(lock);
+
+#ifdef HAVE_SERVER_SUPPORT
+ if (reconstruct) {
+ struct ptlrpc_request *req = cookie;
+
+ tgt_mk_reply_data(NULL, NULL,
+ &req->rq_export->exp_target_data,
+ req, 0, NULL, false, 0);
+ }
+#endif
if (node)
OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
return rc;
__u64 flags;
int rc = LDLM_ITER_CONTINUE;
enum ldlm_error err;
- struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
+ LIST_HEAD(bl_ast_list);
ENTRY;
restart:
list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *pending;
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ LIST_HEAD(rpc_list);
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
arg->gl_interpret_data = gl_work->gl_interpret_data;
/* invoke the actual glimpse callback */
- if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
- rc = 1;
+ rc = lock->l_glimpse_ast(lock, (void *)arg);
+ if (rc == 0)
+ rc = 1; /* update LVB if this is server lock */
+ else if (rc == -ELDLM_NO_LOCK_DATA)
+ ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
LDLM_LOCK_RELEASE(lock);
if (gl_work->gl_flags & LDLM_GL_WORK_SLAB_ALLOCATED)
enum ldlm_process_intention intention,
struct ldlm_lock *hint)
{
- struct list_head rpc_list;
+ LIST_HEAD(rpc_list);
#ifdef HAVE_SERVER_SUPPORT
ldlm_reprocessing_policy reprocess;
struct obd_device *obd;
ENTRY;
- INIT_LIST_HEAD(&rpc_list);
/* Local lock trees don't get reprocessed. */
if (ns_is_client(ldlm_res_to_ns(res))) {
EXIT;
#else
ENTRY;
- INIT_LIST_HEAD(&rpc_list);
if (!ns_is_client(ldlm_res_to_ns(res))) {
CERROR("This is client-side-only module, cannot handle "
"LDLM_NAMESPACE_SERVER resource type lock.\n");
&vaf,
lock,
lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
va_end(args);
return;
&vaf,
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
lock->l_req_extent.start, lock->l_req_extent.end,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
break;
&vaf,
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
lock->l_policy_data.l_flock.end,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
- lock->l_pid, lock->l_callback_timeout);
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+ lock->l_pid, lock->l_callback_timestamp);
break;
case LDLM_IBITS:
&vaf,
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
ldlm_typename[resource->lr_type],
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
break;
&vaf,
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
ldlm_typename[resource->lr_type],
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
break;
}