return ldlm_processing_policy_table[res->lr_type];
}
EXPORT_SYMBOL(ldlm_get_processing_policy);
+
+static ldlm_reprocessing_policy ldlm_reprocessing_policy_table[] = {
+ [LDLM_PLAIN] = ldlm_reprocess_queue,
+ [LDLM_EXTENT] = ldlm_reprocess_queue,
+ [LDLM_FLOCK] = ldlm_reprocess_queue,
+ [LDLM_IBITS] = ldlm_reprocess_inodebits_queue,
+};
+
+ldlm_reprocessing_policy ldlm_get_reprocessing_policy(struct ldlm_resource *res)
+{
+ return ldlm_reprocessing_policy_table[res->lr_type];
+}
+
#endif /* HAVE_SERVER_SUPPORT */
void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
*/
struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
{
- atomic_inc(&lock->l_refc);
+ refcount_inc(&lock->l_handle.h_ref);
return lock;
}
EXPORT_SYMBOL(ldlm_lock_get);
+static void lock_handle_free(struct rcu_head *rcu)
+{
+ struct ldlm_lock *lock = container_of(rcu, struct ldlm_lock,
+ l_handle.h_rcu);
+
+ OBD_FREE_PRE(lock, sizeof(*lock), "slab-freed");
+ kmem_cache_free(ldlm_lock_slab, lock);
+}
+
/**
* Release lock reference.
*
ENTRY;
LASSERT(lock->l_resource != LP_POISON);
- LASSERT(atomic_read(&lock->l_refc) > 0);
- if (atomic_dec_and_test(&lock->l_refc)) {
+ LASSERT(refcount_read(&lock->l_handle.h_ref) > 0);
+ if (refcount_dec_and_test(&lock->l_handle.h_ref)) {
struct ldlm_resource *res;
LDLM_DEBUG(lock,
lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
LDLM_NSS_LOCKS);
lu_ref_del(&res->lr_reference, "lock", lock);
- ldlm_resource_putref(res);
- lock->l_resource = NULL;
if (lock->l_export) {
class_export_lock_put(lock->l_export, lock);
lock->l_export = NULL;
if (lock->l_lvb_data != NULL)
OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
- ldlm_interval_free(ldlm_interval_detach(lock));
+ if (res->lr_type == LDLM_EXTENT) {
+ ldlm_interval_free(ldlm_interval_detach(lock));
+ } else if (res->lr_type == LDLM_IBITS) {
+ if (lock->l_ibits_node != NULL)
+ OBD_SLAB_FREE_PTR(lock->l_ibits_node,
+ ldlm_inodebits_slab);
+ }
+ ldlm_resource_putref(res);
+ lock->l_resource = NULL;
lu_ref_fini(&lock->l_reference);
- OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
+ call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
}
EXIT;
EXIT;
}
-/* this is called by portals_handle2object with the handle lock taken */
-static void lock_handle_addref(void *lock)
-{
- LDLM_LOCK_GET((struct ldlm_lock *)lock);
-}
-
-static void lock_handle_free(void *lock, int size)
-{
- LASSERT(size == sizeof(struct ldlm_lock));
- OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
-}
-
-static struct portals_handle_ops lock_handle_ops = {
- .hop_addref = lock_handle_addref,
- .hop_free = lock_handle_free,
-};
+static const char lock_handle_owner[] = "ldlm";
/**
*
lock->l_resource = resource;
lu_ref_add(&resource->lr_reference, "lock", lock);
- atomic_set(&lock->l_refc, 2);
+ refcount_set(&lock->l_handle.h_ref, 2);
INIT_LIST_HEAD(&lock->l_res_link);
INIT_LIST_HEAD(&lock->l_lru);
INIT_LIST_HEAD(&lock->l_pending_chain);
lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
LDLM_NSS_LOCKS);
- INIT_LIST_HEAD_RCU(&lock->l_handle.h_link);
- class_handle_hash(&lock->l_handle, &lock_handle_ops);
+ INIT_HLIST_NODE(&lock->l_handle.h_link);
+ class_handle_hash(&lock->l_handle, lock_handle_owner);
lu_ref_init(&lock->l_reference);
lu_ref_add(&lock->l_reference, "hash", lock);
LASSERT(handle);
- lock = class_handle2object(handle->cookie, NULL);
+ if (!lustre_handle_is_used(handle))
+ RETURN(NULL);
+
+ lock = class_handle2object(handle->cookie, lock_handle_owner);
if (lock == NULL)
RETURN(NULL);
prev->mode_link = &req->l_sl_mode;
prev->policy_link = &req->l_sl_policy;
EXIT;
- return;
}
/**
}
/**
- * Describe the overlap between two locks. itree_overlap_cb data.
- */
-struct lock_match_data {
- struct ldlm_lock *lmd_old;
- struct ldlm_lock *lmd_lock;
- enum ldlm_mode *lmd_mode;
- union ldlm_policy_data *lmd_policy;
- __u64 lmd_flags;
- __u64 lmd_skip_flags;
- int lmd_unref;
-};
-
-/**
* Check if the given @lock meets the criteria for a match.
* A reference on the lock is taken if matched.
*
* \param lock test-against this lock
* \param data parameters
*/
-static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
+static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
{
union ldlm_policy_data *lpol = &lock->l_policy_data;
- enum ldlm_mode match;
+ enum ldlm_mode match = LCK_MINMODE;
if (lock == data->lmd_old)
return INTERVAL_ITER_STOP;
if (!(lock->l_req_mode & *data->lmd_mode))
return INTERVAL_ITER_CONT;
+
+ /* When we search for ast_data, we are not doing a traditional match,
+ * so we don't worry about IBITS or extent matching.
+ */
+ if (data->lmd_has_ast_data) {
+ if (!lock->l_ast_data)
+ return INTERVAL_ITER_CONT;
+
+ goto matched;
+ }
+
match = lock->l_req_mode;
switch (lock->l_resource->lr_type) {
if (data->lmd_skip_flags & lock->l_flags)
return INTERVAL_ITER_CONT;
+matched:
if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
LDLM_LOCK_GET(lock);
ldlm_lock_touch_in_lru(lock);
static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
{
struct ldlm_interval *node = to_ldlm_interval(in);
- struct lock_match_data *data = args;
+ struct ldlm_match_data *data = args;
struct ldlm_lock *lock;
int rc;
*
* \retval a referenced lock or NULL.
*/
-static struct ldlm_lock *search_itree(struct ldlm_resource *res,
- struct lock_match_data *data)
+struct ldlm_lock *search_itree(struct ldlm_resource *res,
+ struct ldlm_match_data *data)
{
struct interval_node_extent ext = {
.start = data->lmd_policy->l_extent.start,
return NULL;
}
+EXPORT_SYMBOL(search_itree);
/**
* \retval a referenced lock or NULL.
*/
static struct ldlm_lock *search_queue(struct list_head *queue,
- struct lock_match_data *data)
+ struct ldlm_match_data *data)
{
struct ldlm_lock *lock;
int rc;
enum ldlm_mode mode,
struct lustre_handle *lockh, int unref)
{
- struct lock_match_data data = {
+ struct ldlm_match_data data = {
.lmd_old = NULL,
.lmd_lock = NULL,
.lmd_mode = &mode,
.lmd_flags = flags,
.lmd_skip_flags = skip_flags,
.lmd_unref = unref,
+ .lmd_has_ast_data = false,
};
struct ldlm_resource *res;
struct ldlm_lock *lock;
(!ldlm_is_lvb_ready(lock))) {
__u64 wait_flags = LDLM_FL_LVB_READY |
LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
- struct l_wait_info lwi;
if (lock->l_completion_ast) {
int err = lock->l_completion_ast(lock,
GOTO(out_fail_match, matched = 0);
}
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
- NULL, LWI_ON_SIGNAL_NOOP, NULL);
+ wait_event_idle_timeout(
+ lock->l_waitq,
+ lock->l_flags & wait_flags,
+ cfs_time_seconds(obd_timeout));
- /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
- l_wait_event(lock->l_waitq, lock->l_flags & wait_flags,
- &lwi);
if (!ldlm_is_lvb_ready(lock))
GOTO(out_fail_match, matched = 0);
}
lock->l_glimpse_ast = cbs->lcs_glimpse;
}
- lock->l_tree_node = NULL;
- /* if this is the extent lock, allocate the interval tree node */
- if (type == LDLM_EXTENT)
- if (ldlm_interval_alloc(lock) == NULL)
- GOTO(out, rc = -ENOMEM);
+ switch (type) {
+ case LDLM_EXTENT:
+ rc = ldlm_extent_alloc_lock(lock);
+ break;
+ case LDLM_IBITS:
+ rc = ldlm_inodebits_alloc_lock(lock);
+ break;
+ default:
+ rc = 0;
+ }
+ if (rc)
+ GOTO(out, rc);
if (lvb_len) {
lock->l_lvb_len = lvb_len;
{
struct ldlm_resource *res = lock->l_resource;
enum ldlm_error rc = ELDLM_OK;
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ LIST_HEAD(rpc_list);
ldlm_processing_policy policy;
+
ENTRY;
- policy = ldlm_processing_policy_table[res->lr_type];
+ policy = ldlm_get_processing_policy(res);
restart:
policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
int local = ns_is_client(ldlm_res_to_ns(res));
enum ldlm_error rc = ELDLM_OK;
struct ldlm_interval *node = NULL;
+#ifdef HAVE_SERVER_SUPPORT
+ bool reconstruct = false;
+#endif
ENTRY;
/* policies are not executed on the client or during replay */
if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
+#ifdef HAVE_SERVER_SUPPORT
+ reconstruct = !local && res->lr_type == LDLM_FLOCK &&
+ !(*flags & LDLM_FL_TEST_LOCK);
+ if (reconstruct) {
+ rc = req_can_reconstruct(cookie, NULL);
+ if (rc != 0) {
+ if (rc == 1)
+ rc = 0;
+ RETURN(rc);
+ }
+ }
+#endif
+
lock_res_and_lock(lock);
if (local && ldlm_is_granted(lock)) {
/* The server returned a blocked lock, but it was granted
out:
unlock_res_and_lock(lock);
+
+#ifdef HAVE_SERVER_SUPPORT
+ if (reconstruct) {
+ struct ptlrpc_request *req = cookie;
+
+ tgt_mk_reply_data(NULL, NULL,
+ &req->rq_export->exp_target_data,
+ req, 0, NULL, false, 0);
+ }
+#endif
if (node)
OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
return rc;
*/
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
struct list_head *work_list,
- enum ldlm_process_intention intention)
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint)
{
struct list_head *tmp, *pos;
ldlm_processing_policy policy;
__u64 flags;
int rc = LDLM_ITER_CONTINUE;
enum ldlm_error err;
- struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
+ LIST_HEAD(bl_ast_list);
+
ENTRY;
check_res_locked(res);
- policy = ldlm_processing_policy_table[res->lr_type];
+ policy = ldlm_get_processing_policy(res);
LASSERT(policy);
LASSERT(intention == LDLM_PROCESS_RESCAN ||
intention == LDLM_PROCESS_RECOVERY);
restart:
list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *pending;
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ LIST_HEAD(rpc_list);
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
* if anything could be granted as a result of the cancellation.
*/
static void __ldlm_reprocess_all(struct ldlm_resource *res,
- enum ldlm_process_intention intention)
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint)
{
- struct list_head rpc_list;
+ LIST_HEAD(rpc_list);
#ifdef HAVE_SERVER_SUPPORT
+ ldlm_reprocessing_policy reprocess;
struct obd_device *obd;
- int rc;
- ENTRY;
+ int rc;
- INIT_LIST_HEAD(&rpc_list);
- /* Local lock trees don't get reprocessed. */
- if (ns_is_client(ldlm_res_to_ns(res))) {
- EXIT;
- return;
- }
+ ENTRY;
+
+ /* Local lock trees don't get reprocessed. */
+ if (ns_is_client(ldlm_res_to_ns(res))) {
+ EXIT;
+ return;
+ }
/* Disable reprocess during lock replay stage but allow during
* request replay stage.
RETURN_EXIT;
restart:
lock_res(res);
- ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention);
+ reprocess = ldlm_get_reprocessing_policy(res);
+ reprocess(res, &res->lr_waiting, &rpc_list, intention, hint);
unlock_res(res);
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
goto restart;
}
#else
- ENTRY;
+ ENTRY;
- INIT_LIST_HEAD(&rpc_list);
- if (!ns_is_client(ldlm_res_to_ns(res))) {
- CERROR("This is client-side-only module, cannot handle "
- "LDLM_NAMESPACE_SERVER resource type lock.\n");
- LBUG();
- }
+ if (!ns_is_client(ldlm_res_to_ns(res))) {
+ CERROR("This is client-side-only module, cannot handle "
+ "LDLM_NAMESPACE_SERVER resource type lock.\n");
+ LBUG();
+ }
#endif
- EXIT;
+ EXIT;
}
-void ldlm_reprocess_all(struct ldlm_resource *res)
+void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
{
- __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
}
EXPORT_SYMBOL(ldlm_reprocess_all);
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
/* This is only called once after recovery done. LU-8306. */
- __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
return 0;
}
if (!ldlm_is_cancel(lock)) {
ldlm_set_cancel(lock);
if (lock->l_blocking_ast) {
- unlock_res_and_lock(lock);
- lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
- LDLM_CB_CANCELING);
- lock_res_and_lock(lock);
- } else {
- LDLM_DEBUG(lock, "no blocking ast");
- }
+ unlock_res_and_lock(lock);
+ lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
+ LDLM_CB_CANCELING);
+ lock_res_and_lock(lock);
+ } else {
+ LDLM_DEBUG(lock, "no blocking ast");
+ }
/* only canceller can set bl_done bit */
ldlm_set_bl_done(lock);
wake_up_all(&lock->l_waitq);
} else if (!ldlm_is_bl_done(lock)) {
- struct l_wait_info lwi = { 0 };
-
/* The lock is guaranteed to have been canceled once
* returning from this function. */
unlock_res_and_lock(lock);
- l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+ wait_event_idle(lock->l_waitq, is_bl_done(lock));
lock_res_and_lock(lock);
}
}
ldlm_lvbo_update(res, lock, NULL, 1);
ldlm_lock_cancel(lock);
if (!exp->exp_obd->obd_stopping)
- ldlm_reprocess_all(res);
+ ldlm_reprocess_all(res, lock);
ldlm_resource_putref(res);
ecl->ecl_loop++;
}
/**
- * Downgrade an PW/EX lock to COS mode.
+ * Downgrade an PW/EX lock to COS | CR mode.
*
* A lock mode convertion from PW/EX mode to less conflict mode. The
* convertion may fail if lock was canceled before downgrade, but it doesn't
* things are cleared, so any pending or new blocked lock on that lock will
* cause new call to blocking_ast and force resource object commit.
*
+ * Also used by layout_change to replace EX lock to CR lock.
+ *
* \param lock A lock to convert
* \param new_mode new lock mode
*/
#ifdef HAVE_SERVER_SUPPORT
ENTRY;
- LASSERT(new_mode == LCK_COS);
+ LASSERT(new_mode == LCK_COS || new_mode == LCK_CR);
lock_res_and_lock(lock);
ldlm_grant_lock(lock, NULL);
unlock_res_and_lock(lock);
- ldlm_reprocess_all(lock->l_resource);
+ ldlm_reprocess_all(lock->l_resource, lock);
EXIT;
#endif
&vaf,
lock,
lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
lock->l_pid, lock->l_callback_timeout,
lock->l_lvb_type);
va_end(args);
&vaf,
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
lock->l_req_extent.start, lock->l_req_extent.end,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
lock->l_pid, lock->l_callback_timeout,
lock->l_lvb_type);
break;
&vaf,
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
lock->l_policy_data.l_flock.end,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
lock->l_pid, lock->l_callback_timeout);
break;
&vaf,
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
ldlm_typename[resource->lr_type],
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
lock->l_pid, lock->l_callback_timeout,
lock->l_lvb_type);
break;
&vaf,
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
- atomic_read(&lock->l_refc),
+ refcount_read(&lock->l_handle.h_ref),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
ldlm_typename[resource->lr_type],
lock->l_flags, nid,
lock->l_remote_handle.cookie,
- exp ? atomic_read(&exp->exp_refcount) : -99,
+ exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
lock->l_pid, lock->l_callback_timeout,
lock->l_lvb_type);
break;