struct interval_node *lit_root; /* actual ldlm_interval */
};
+/**
+ * Lists of waiting locks for each inodebit type.
+ * A lock can be in several liq_waiting lists and it remains in lr_waiting.
+ */
+struct ldlm_ibits_queues {
+ struct list_head liq_waiting[MDS_INODELOCK_NUMBITS];
+};
+
+struct ldlm_ibits_node {
+ struct list_head lin_link[MDS_INODELOCK_NUMBITS];
+ struct ldlm_lock *lock;
+};
+
/** Whether to track references to exports by LDLM locks. */
#define LUSTRE_TRACKS_LOCK_EXP_REFS (0)
*/
struct list_head l_res_link;
/**
- * Tree node for ldlm_extent.
+ * Internal structures per lock type..
*/
- struct ldlm_interval *l_tree_node;
+ union {
+ struct ldlm_interval *l_tree_node;
+ struct ldlm_ibits_node *l_ibits_node;
+ };
/**
* Per export hash of locks.
* Protected by per-bucket exp->exp_lock_hash locks.
/** Resource name */
struct ldlm_res_id lr_name;
- /**
- * Interval trees (only for extent locks) for all modes of this resource
- */
- struct ldlm_interval_tree *lr_itree;
+ union {
+ /**
+ * Interval trees (only for extent locks) for all modes of
+ * this resource
+ */
+ struct ldlm_interval_tree *lr_itree;
+ struct ldlm_ibits_queues *lr_ibits_queues;
+ };
union {
/**
enum ldlm_error *err,
struct list_head *work_list);
+typedef int (*ldlm_reprocessing_policy)(struct ldlm_resource *res,
+ struct list_head *queue,
+ struct list_head *work_list,
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint);
+
/**
* Return values for lock iterators.
* Also used during deciding of lock grants and cancellations.
/* ldlm_lock.c */
#ifdef HAVE_SERVER_SUPPORT
ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
+ldlm_reprocessing_policy
+ldlm_get_reprocessing_policy(struct ldlm_resource *res);
#endif
void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg);
void ldlm_lock2handle(const struct ldlm_lock *lock,
__u64 *bits);
void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode);
void ldlm_lock_cancel(struct ldlm_lock *lock);
-void ldlm_reprocess_all(struct ldlm_resource *res);
+void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint);
void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns);
void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh);
void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
EXPORT_SYMBOL(ldlm_extent_shift_kms);
struct kmem_cache *ldlm_interval_slab;
-struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
+static struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
{
struct ldlm_interval *node;
ENTRY;
return index;
}
+int ldlm_extent_alloc_lock(struct ldlm_lock *lock)
+{
+ lock->l_tree_node = NULL;
+ if (ldlm_interval_alloc(lock) == NULL)
+ return -ENOMEM;
+ return 0;
+}
+
/** Add newly granted lock into interval tree for the resource. */
void ldlm_extent_add_lock(struct ldlm_resource *res,
struct ldlm_lock *lock)
restart:
ldlm_reprocess_queue(res, &res->lr_waiting,
&rpc_list,
- LDLM_PROCESS_RESCAN);
+ LDLM_PROCESS_RESCAN, NULL);
unlock_res_and_lock(req);
rc = ldlm_run_ast_work(ns, &rpc_list,
#ifdef HAVE_SERVER_SUPPORT
/**
+ * It should iterate through all waiting locks on a given resource queue and
+ * attempt to grant them. An optimization is to check only heads waitintg
+ * locks for each inodebit type.
+ *
+ * Must be called with resource lock held.
+ */
+int ldlm_reprocess_inodebits_queue(struct ldlm_resource *res,
+ struct list_head *queue,
+ struct list_head *work_list,
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint)
+{
+ __u64 flags;
+ int rc = LDLM_ITER_CONTINUE;
+ enum ldlm_error err;
+ struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
+ struct ldlm_ibits_queues *queues = res->lr_ibits_queues;
+ int i;
+
+ ENTRY;
+
+ check_res_locked(res);
+
+ LASSERT(res->lr_type == LDLM_IBITS);
+ LASSERT(intention == LDLM_PROCESS_RESCAN ||
+ intention == LDLM_PROCESS_RECOVERY);
+
+ if (intention == LDLM_PROCESS_RECOVERY)
+ return ldlm_reprocess_queue(res, queue, work_list, intention,
+ NULL);
+
+restart:
+ CDEBUG(D_DLMTRACE, "--- Reprocess resource "DLDLMRES" (%p)\n",
+ PLDLMRES(res), res);
+
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ struct list_head *head = &queues->liq_waiting[i];
+ struct ldlm_lock *pending;
+ struct ldlm_ibits_node *node;
+
+ if (list_empty(head))
+ continue;
+ if (hint && !(hint->l_policy_data.l_inodebits.bits & (1 << i)))
+ continue;
+
+ node = list_entry(head->next, struct ldlm_ibits_node,
+ lin_link[i]);
+
+ pending = node->lock;
+ LDLM_DEBUG(pending, "Reprocessing lock from queue %d", i);
+
+ flags = 0;
+ rc = ldlm_process_inodebits_lock(pending, &flags, intention,
+ &err, &rpc_list);
+ if (ldlm_is_granted(pending)) {
+ list_splice(&rpc_list, work_list);
+ /* Try to grant more locks from current queue */
+ i--;
+ } else {
+ list_splice(&rpc_list, &bl_ast_list);
+ }
+ }
+
+ if (!list_empty(&bl_ast_list)) {
+ unlock_res(res);
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
+ LDLM_WORK_BL_AST);
+
+ lock_res(res);
+ if (rc == -ERESTART)
+ GOTO(restart, rc);
+ }
+
+ if (!list_empty(&bl_ast_list))
+ ldlm_discard_bl_list(&bl_ast_list);
+
+ RETURN(rc);
+}
+
+/**
* Determine if the lock is compatible with all locks on the queue.
*
* If \a work_list is provided, conflicting locks are linked there.
LDLM_DEBUG(lock, "client lock convert END");
return rc;
}
+
+
+int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock)
+{
+ if (ldlm_is_ns_srv(lock)) {
+ int i;
+
+ OBD_SLAB_ALLOC_PTR(lock->l_ibits_node, ldlm_inodebits_slab);
+ if (lock->l_ibits_node == NULL)
+ return -ENOMEM;
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++)
+ INIT_LIST_HEAD(&lock->l_ibits_node->lin_link[i]);
+ lock->l_ibits_node->lock = lock;
+ } else {
+ lock->l_ibits_node = NULL;
+ }
+ return 0;
+}
+
+void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head,
+ struct ldlm_lock *lock)
+{
+ int i;
+
+ if (!ldlm_is_ns_srv(lock))
+ return;
+
+ if (head == &res->lr_waiting) {
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
+ if (lock->l_policy_data.l_inodebits.bits & (1 << i))
+ list_add_tail(&lock->l_ibits_node->lin_link[i],
+ &res->lr_ibits_queues->liq_waiting[i]);
+ }
+ } else if (head == &res->lr_granted && lock->l_ibits_node != NULL) {
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++)
+ LASSERT(list_empty(&lock->l_ibits_node->lin_link[i]));
+ OBD_SLAB_FREE_PTR(lock->l_ibits_node, ldlm_inodebits_slab);
+ lock->l_ibits_node = NULL;
+ }
+}
+
+void ldlm_inodebits_unlink_lock(struct ldlm_lock *lock)
+{
+ int i;
+
+ ldlm_unlink_lock_skiplist(lock);
+ if (!ldlm_is_ns_srv(lock))
+ return;
+
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++)
+ list_del_init(&lock->l_ibits_node->lin_link[i]);
+}
/* ldlm_resource.c */
extern struct kmem_cache *ldlm_resource_slab;
extern struct kmem_cache *ldlm_lock_slab;
+extern struct kmem_cache *ldlm_inodebits_slab;
extern struct kmem_cache *ldlm_interval_tree_slab;
void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
#ifdef HAVE_SERVER_SUPPORT
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
struct list_head *work_list,
- enum ldlm_process_intention intention);
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint);
int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
struct list_head *rpc_list);
void ldlm_discard_bl_list(struct list_head *bl_list);
enum ldlm_process_intention intention,
enum ldlm_error *err,
struct list_head *work_list);
+int ldlm_reprocess_inodebits_queue(struct ldlm_resource *res,
+ struct list_head *queue,
+ struct list_head *work_list,
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint);
/* ldlm_extent.c */
int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
enum ldlm_process_intention intention,
enum ldlm_error *err, struct list_head *work_list);
#endif
+int ldlm_extent_alloc_lock(struct ldlm_lock *lock);
void ldlm_extent_add_lock(struct ldlm_resource *res, struct ldlm_lock *lock);
void ldlm_extent_unlink_lock(struct ldlm_lock *lock);
+int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock);
+void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head,
+ struct ldlm_lock *lock);
+void ldlm_inodebits_unlink_lock(struct ldlm_lock *lock);
+
/* ldlm_flock.c */
int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
enum ldlm_process_intention intention,
extern struct kmem_cache *ldlm_interval_slab; /* slab cache for ldlm_interval */
extern void ldlm_interval_attach(struct ldlm_interval *n, struct ldlm_lock *l);
extern struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l);
-extern struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock);
extern void ldlm_interval_free(struct ldlm_interval *node);
/* this function must be called with res lock held */
static inline struct ldlm_extent *
return ldlm_processing_policy_table[res->lr_type];
}
EXPORT_SYMBOL(ldlm_get_processing_policy);
+
+static ldlm_reprocessing_policy ldlm_reprocessing_policy_table[] = {
+ [LDLM_PLAIN] = ldlm_reprocess_queue,
+ [LDLM_EXTENT] = ldlm_reprocess_queue,
+ [LDLM_FLOCK] = ldlm_reprocess_queue,
+ [LDLM_IBITS] = ldlm_reprocess_inodebits_queue,
+};
+
+ldlm_reprocessing_policy ldlm_get_reprocessing_policy(struct ldlm_resource *res)
+{
+ return ldlm_reprocessing_policy_table[res->lr_type];
+}
+
#endif /* HAVE_SERVER_SUPPORT */
void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
LDLM_NSS_LOCKS);
lu_ref_del(&res->lr_reference, "lock", lock);
- ldlm_resource_putref(res);
- lock->l_resource = NULL;
if (lock->l_export) {
class_export_lock_put(lock->l_export, lock);
lock->l_export = NULL;
if (lock->l_lvb_data != NULL)
OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
- ldlm_interval_free(ldlm_interval_detach(lock));
+ if (res->lr_type == LDLM_EXTENT) {
+ ldlm_interval_free(ldlm_interval_detach(lock));
+ } else if (res->lr_type == LDLM_IBITS) {
+ if (lock->l_ibits_node != NULL)
+ OBD_SLAB_FREE_PTR(lock->l_ibits_node,
+ ldlm_inodebits_slab);
+ }
+ ldlm_resource_putref(res);
+ lock->l_resource = NULL;
lu_ref_fini(&lock->l_reference);
OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
}
lock->l_glimpse_ast = cbs->lcs_glimpse;
}
- lock->l_tree_node = NULL;
- /* if this is the extent lock, allocate the interval tree node */
- if (type == LDLM_EXTENT)
- if (ldlm_interval_alloc(lock) == NULL)
- GOTO(out, rc = -ENOMEM);
+ switch (type) {
+ case LDLM_EXTENT:
+ rc = ldlm_extent_alloc_lock(lock);
+ break;
+ case LDLM_IBITS:
+ rc = ldlm_inodebits_alloc_lock(lock);
+ break;
+ default:
+ rc = 0;
+ }
+ if (rc)
+ GOTO(out, rc);
if (lvb_len) {
lock->l_lvb_len = lvb_len;
enum ldlm_error rc = ELDLM_OK;
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
ldlm_processing_policy policy;
+
ENTRY;
- policy = ldlm_processing_policy_table[res->lr_type];
+ policy = ldlm_get_processing_policy(res);
restart:
policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
*/
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
struct list_head *work_list,
- enum ldlm_process_intention intention)
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint)
{
struct list_head *tmp, *pos;
ldlm_processing_policy policy;
int rc = LDLM_ITER_CONTINUE;
enum ldlm_error err;
struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
+
ENTRY;
check_res_locked(res);
- policy = ldlm_processing_policy_table[res->lr_type];
+ policy = ldlm_get_processing_policy(res);
LASSERT(policy);
LASSERT(intention == LDLM_PROCESS_RESCAN ||
intention == LDLM_PROCESS_RECOVERY);
* if anything could be granted as a result of the cancellation.
*/
static void __ldlm_reprocess_all(struct ldlm_resource *res,
- enum ldlm_process_intention intention)
+ enum ldlm_process_intention intention,
+ struct ldlm_lock *hint)
{
struct list_head rpc_list;
#ifdef HAVE_SERVER_SUPPORT
+ ldlm_reprocessing_policy reprocess;
struct obd_device *obd;
- int rc;
- ENTRY;
+ int rc;
+
+ ENTRY;
INIT_LIST_HEAD(&rpc_list);
- /* Local lock trees don't get reprocessed. */
- if (ns_is_client(ldlm_res_to_ns(res))) {
- EXIT;
- return;
- }
+ /* Local lock trees don't get reprocessed. */
+ if (ns_is_client(ldlm_res_to_ns(res))) {
+ EXIT;
+ return;
+ }
/* Disable reprocess during lock replay stage but allow during
* request replay stage.
RETURN_EXIT;
restart:
lock_res(res);
- ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention);
+ reprocess = ldlm_get_reprocessing_policy(res);
+ reprocess(res, &res->lr_waiting, &rpc_list, intention, hint);
unlock_res(res);
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
goto restart;
}
#else
- ENTRY;
+ ENTRY;
INIT_LIST_HEAD(&rpc_list);
- if (!ns_is_client(ldlm_res_to_ns(res))) {
- CERROR("This is client-side-only module, cannot handle "
- "LDLM_NAMESPACE_SERVER resource type lock.\n");
- LBUG();
- }
+ if (!ns_is_client(ldlm_res_to_ns(res))) {
+ CERROR("This is client-side-only module, cannot handle "
+ "LDLM_NAMESPACE_SERVER resource type lock.\n");
+ LBUG();
+ }
#endif
- EXIT;
+ EXIT;
}
-void ldlm_reprocess_all(struct ldlm_resource *res)
+void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
{
- __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
}
EXPORT_SYMBOL(ldlm_reprocess_all);
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
/* This is only called once after recovery done. LU-8306. */
- __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
return 0;
}
ldlm_lvbo_update(res, lock, NULL, 1);
ldlm_lock_cancel(lock);
if (!exp->exp_obd->obd_stopping)
- ldlm_reprocess_all(res);
+ ldlm_reprocess_all(res, lock);
ldlm_resource_putref(res);
ecl->ecl_loop++;
ldlm_grant_lock(lock, NULL);
unlock_res_and_lock(lock);
- ldlm_reprocess_all(lock->l_resource);
+ ldlm_reprocess_all(lock->l_resource, lock);
EXIT;
#endif
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
LDLM_WORK_GL_AST);
if (rc == -ERESTART)
- ldlm_reprocess_all(res);
+ ldlm_reprocess_all(res, NULL);
RETURN(rc);
}
if (!err && !ldlm_is_cbpending(lock) &&
dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
- ldlm_reprocess_all(lock->l_resource);
+ ldlm_reprocess_all(lock->l_resource, lock);
LDLM_LOCK_RELEASE(lock);
}
ldlm_clear_blocking_data(lock);
unlock_res_and_lock(lock);
- ldlm_reprocess_all(lock->l_resource);
+ ldlm_reprocess_all(lock->l_resource, NULL);
rc = ELDLM_OK;
}
/* This code is an optimization to only attempt lock
* granting on the resource (that could be CPU-expensive)
- * after we are done cancelling lock in that resource. */
- if (res != pres) {
- if (pres != NULL) {
- ldlm_reprocess_all(pres);
- LDLM_RESOURCE_DELREF(pres);
- ldlm_resource_putref(pres);
- }
+ * after we are done cancelling lock in that resource.
+ */
+ if (res != pres) {
+ if (pres != NULL) {
+ ldlm_reprocess_all(pres, NULL);
+ LDLM_RESOURCE_DELREF(pres);
+ ldlm_resource_putref(pres);
+ }
if (res != NULL) {
ldlm_resource_getref(res);
LDLM_RESOURCE_ADDREF(res);
(s64)delay);
at_measured(&lock->l_export->exp_bl_lock_at, delay);
}
- ldlm_lock_cancel(lock);
- LDLM_LOCK_PUT(lock);
- }
- if (pres != NULL) {
- ldlm_reprocess_all(pres);
- LDLM_RESOURCE_DELREF(pres);
- ldlm_resource_putref(pres);
- }
- LDLM_DEBUG_NOLOCK("server-side cancel handler END");
- RETURN(done);
+ ldlm_lock_cancel(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+ if (pres != NULL) {
+ ldlm_reprocess_all(pres, NULL);
+ LDLM_RESOURCE_DELREF(pres);
+ ldlm_resource_putref(pres);
+ }
+ LDLM_DEBUG_NOLOCK("server-side cancel handler END");
+ RETURN(done);
}
EXPORT_SYMBOL(ldlm_request_cancel);
goto out_interval;
#ifdef HAVE_SERVER_SUPPORT
+ ldlm_inodebits_slab = kmem_cache_create("ldlm_ibits_node",
+ sizeof(struct ldlm_ibits_node),
+ 0, SLAB_HWCACHE_ALIGN, NULL);
+ if (ldlm_inodebits_slab == NULL)
+ goto out_interval_tree;
+
ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
sizeof(struct ldlm_glimpse_work),
0, 0, NULL);
if (ldlm_glimpse_work_kmem == NULL)
- goto out_interval_tree;
+ goto out_inodebits;
#endif
#if LUSTRE_TRACKS_LOCK_EXP_REFS
#endif
return 0;
#ifdef HAVE_SERVER_SUPPORT
+out_inodebits:
+ kmem_cache_destroy(ldlm_inodebits_slab);
out_interval_tree:
kmem_cache_destroy(ldlm_interval_tree_slab);
#endif
kmem_cache_destroy(ldlm_interval_slab);
kmem_cache_destroy(ldlm_interval_tree_slab);
#ifdef HAVE_SERVER_SUPPORT
+ kmem_cache_destroy(ldlm_inodebits_slab);
kmem_cache_destroy(ldlm_glimpse_work_kmem);
#endif
}
RETURN(ldlm_completion_tail(lock, data));
}
- LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
- "going forward");
- ldlm_reprocess_all(lock->l_resource);
+ LDLM_DEBUG(lock,
+ "client-side enqueue returned a blocked lock, going forward");
+ ldlm_reprocess_all(lock->l_resource, NULL);
RETURN(0);
}
EXPORT_SYMBOL(ldlm_completion_ast_async);
LDLM_FL_BL_AST : LDLM_FL_CANCELING;
unlock_res_and_lock(lock);
- if (local_only) {
- CDEBUG(D_DLMTRACE, "not sending request (at caller's "
- "instruction)\n");
- rc = LDLM_FL_LOCAL_ONLY;
- }
- ldlm_lock_cancel(lock);
- } else {
- if (ns_is_client(ldlm_lock_to_ns(lock))) {
- LDLM_ERROR(lock, "Trying to cancel local lock");
- LBUG();
- }
- LDLM_DEBUG(lock, "server-side local cancel");
- ldlm_lock_cancel(lock);
- ldlm_reprocess_all(lock->l_resource);
- }
+ if (local_only) {
+ CDEBUG(D_DLMTRACE,
+ "not sending request (at caller's instruction)\n");
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
+ ldlm_lock_cancel(lock);
+ } else {
+ if (ns_is_client(ldlm_lock_to_ns(lock))) {
+ LDLM_ERROR(lock, "Trying to cancel local lock");
+ LBUG();
+ }
+ LDLM_DEBUG(lock, "server-side local cancel");
+ ldlm_lock_cancel(lock);
+ ldlm_reprocess_all(lock->l_resource, lock);
+ }
- RETURN(rc);
+ RETURN(rc);
}
/**
struct kmem_cache *ldlm_resource_slab, *ldlm_lock_slab;
struct kmem_cache *ldlm_interval_tree_slab;
+struct kmem_cache *ldlm_inodebits_slab;
int ldlm_srv_namespace_nr = 0;
int ldlm_cli_namespace_nr = 0;
struct ldlm_namespace, ns_list_chain);
}
+static bool ldlm_resource_extent_new(struct ldlm_resource *res)
+{
+ int idx;
+
+ OBD_SLAB_ALLOC(res->lr_itree, ldlm_interval_tree_slab,
+ sizeof(*res->lr_itree) * LCK_MODE_NUM);
+ if (res->lr_itree == NULL)
+ return false;
+ /* Initialize interval trees for each lock mode. */
+ for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+ res->lr_itree[idx].lit_size = 0;
+ res->lr_itree[idx].lit_mode = 1 << idx;
+ res->lr_itree[idx].lit_root = NULL;
+ }
+ return true;
+}
+
+static bool ldlm_resource_inodebits_new(struct ldlm_resource *res)
+{
+ int i;
+
+ OBD_ALLOC_PTR(res->lr_ibits_queues);
+ if (res->lr_ibits_queues == NULL)
+ return false;
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++)
+ INIT_LIST_HEAD(&res->lr_ibits_queues->liq_waiting[i]);
+ return true;
+}
+
/** Create and initialize new resource. */
static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
{
struct ldlm_resource *res;
- int idx;
+ bool rc;
OBD_SLAB_ALLOC_PTR_GFP(res, ldlm_resource_slab, GFP_NOFS);
if (res == NULL)
return NULL;
- if (ldlm_type == LDLM_EXTENT) {
- OBD_SLAB_ALLOC(res->lr_itree, ldlm_interval_tree_slab,
- sizeof(*res->lr_itree) * LCK_MODE_NUM);
- if (res->lr_itree == NULL) {
- OBD_SLAB_FREE_PTR(res, ldlm_resource_slab);
- return NULL;
- }
- /* Initialize interval trees for each lock mode. */
- for (idx = 0; idx < LCK_MODE_NUM; idx++) {
- res->lr_itree[idx].lit_size = 0;
- res->lr_itree[idx].lit_mode = 1 << idx;
- res->lr_itree[idx].lit_root = NULL;
- }
+ switch (ldlm_type) {
+ case LDLM_EXTENT:
+ rc = ldlm_resource_extent_new(res);
+ break;
+ case LDLM_IBITS:
+ rc = ldlm_resource_inodebits_new(res);
+ break;
+ default:
+ rc = true;
+ break;
+ }
+ if (!rc) {
+ OBD_SLAB_FREE_PTR(res, ldlm_resource_slab);
+ return NULL;
}
INIT_LIST_HEAD(&res->lr_granted);
return res;
}
+static void ldlm_resource_free(struct ldlm_resource *res)
+{
+ if (res->lr_type == LDLM_EXTENT) {
+ if (res->lr_itree != NULL)
+ OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab,
+ sizeof(*res->lr_itree) * LCK_MODE_NUM);
+ } else if (res->lr_type == LDLM_IBITS) {
+ if (res->lr_ibits_queues != NULL)
+ OBD_FREE_PTR(res->lr_ibits_queues);
+ }
+
+ OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+}
+
/**
* Return a reference to resource with given name, creating it if necessary.
* Args: namespace with ns_lock unlocked
cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
/* Clean lu_ref for failed resource. */
lu_ref_fini(&res->lr_reference);
- if (res->lr_itree != NULL)
- OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab,
- sizeof(*res->lr_itree) * LCK_MODE_NUM);
- OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+ ldlm_resource_free(res);
found:
res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
return res;
cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
if (ns->ns_lvbo && ns->ns_lvbo->lvbo_free)
ns->ns_lvbo->lvbo_free(res);
- if (res->lr_itree != NULL)
- OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab,
- sizeof(*res->lr_itree) * LCK_MODE_NUM);
- OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+ ldlm_resource_free(res);
return 1;
}
return 0;
LASSERT(list_empty(&lock->l_res_link));
list_add_tail(&lock->l_res_link, head);
+
+ if (res->lr_type == LDLM_IBITS)
+ ldlm_inodebits_add_lock(res, head, lock);
}
/**
{
int type = lock->l_resource->lr_type;
- check_res_locked(lock->l_resource);
- if (type == LDLM_IBITS || type == LDLM_PLAIN)
- ldlm_unlink_lock_skiplist(lock);
- else if (type == LDLM_EXTENT)
- ldlm_extent_unlink_lock(lock);
+ check_res_locked(lock->l_resource);
+ switch (type) {
+ case LDLM_PLAIN:
+ ldlm_unlink_lock_skiplist(lock);
+ break;
+ case LDLM_EXTENT:
+ ldlm_extent_unlink_lock(lock);
+ break;
+ case LDLM_IBITS:
+ ldlm_inodebits_unlink_lock(lock);
+ break;
+ }
list_del_init(&lock->l_res_link);
}
EXPORT_SYMBOL(ldlm_resource_unlink_lock);
}
out_reprocess:
- ldlm_reprocess_all(lease->l_resource);
+ ldlm_reprocess_all(lease->l_resource, lease);
LDLM_LOCK_PUT(lease);
ma->ma_valid = 0;
out_obj:
mdt_object_put(info->mti_env, swap_objects ? o1 : o2);
- ldlm_reprocess_all(lease->l_resource);
+ ldlm_reprocess_all(lease->l_resource, lease);
out_lease:
LDLM_LOCK_PUT(lease);
OBD_FREE(resync_ids, resync_count * sizeof(__u32));
out_reprocess:
- ldlm_reprocess_all(lease->l_resource);
+ ldlm_reprocess_all(lease->l_resource, lease);
LDLM_LOCK_PUT(lease);
ma->ma_valid = 0;
* cancelled, it's okay to cancel it now as we've held mot_open_sem.
*/
ldlm_lock_cancel(lease);
- ldlm_reprocess_all(lease->l_resource);
+ ldlm_reprocess_all(lease->l_resource, lease);
LDLM_LOCK_PUT(lease);
close:
}
run_test 4 "DoM: glimpse doesn't produce duplicated locks"
+test_5() {
+ local before=$(date +%s)
+ local evict
+
+ dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 || return 1
+
+ multiop_bg_pause $DIR/$tfile O_Ac || return 1
+ setxattr=$!
+
+ multiop_bg_pause $DIR/$tfile O_Tc || return 1
+ truncate=$!
+
+ multiop $DIR2/$tfile Ow10 || return 1
+
+ getfattr -d $DIR2/$tfile
+
+#define OBD_FAIL_LLITE_TRUNCATE_INODE_PAUSE 0x1415
+ $LCTL set_param fail_loc=0x80001415 fail_val=5
+ kill -USR1 $truncate
+ sleep 1
+ multiop $DIR2/$tfile Ow10 &
+ sleep 1
+ kill -USR1 $setxattr
+
+ wait
+
+ evict=$(do_facet client $LCTL get_param mdc.$FSNAME-MDT*.state |
+ awk -F"[ [,]" '/EVICTED ]$/ { if (mx<$5) {mx=$5;} } END { print mx }')
+
+ [ -z "$evict" ] || [[ $evict -le $before ]] ||
+ (do_facet client $LCTL get_param mdc.$FSNAME-MDT*.state;
+ error "eviction happened: $evict before:$before")
+}
+run_test 5 "DoM truncate deadlock"
+
test_fsx() {
local file1=$DIR1/$tfile
local file2=$DIR2/$tfile