io->ci_type = iot;
INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
- INIT_LIST_HEAD(&io->ci_lockset.cls_curr);
INIT_LIST_HEAD(&io->ci_lockset.cls_done);
INIT_LIST_HEAD(&io->ci_layers);
}
EXPORT_SYMBOL(cl_io_rw_init);
-static inline const struct lu_fid *
-cl_lock_descr_fid(const struct cl_lock_descr *descr)
-{
- return lu_object_fid(&descr->cld_obj->co_lu);
-}
-
static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
const struct cl_lock_descr *d1)
{
- return lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1)) ?:
- __diff_normalize(d0->cld_start, d1->cld_start);
-}
-
-static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
- const struct cl_lock_descr *d1)
-{
- int ret;
-
- ret = lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1));
- if (ret)
- return ret;
- if (d0->cld_end < d1->cld_start)
- return -1;
- if (d0->cld_start > d0->cld_end)
- return 1;
- return 0;
-}
-
-static void cl_lock_descr_merge(struct cl_lock_descr *d0,
- const struct cl_lock_descr *d1)
-{
- d0->cld_start = min(d0->cld_start, d1->cld_start);
- d0->cld_end = max(d0->cld_end, d1->cld_end);
-
- if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
- d0->cld_mode = CLM_WRITE;
-
- if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
- d0->cld_mode = CLM_GROUP;
+ return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
+ lu_object_fid(&d1->cld_obj->co_lu));
}
/*
EXIT;
}
-/**
- * Check whether \a queue contains locks matching \a need.
- *
- * \retval +ve there is a matching lock in the \a queue
- * \retval 0 there are no matching locks in the \a queue
- */
-int cl_queue_match(const struct list_head *queue,
- const struct cl_lock_descr *need)
+static void cl_lock_descr_merge(struct cl_lock_descr *d0,
+ const struct cl_lock_descr *d1)
{
- struct cl_io_lock_link *scan;
- ENTRY;
+ d0->cld_start = min(d0->cld_start, d1->cld_start);
+ d0->cld_end = max(d0->cld_end, d1->cld_end);
- list_for_each_entry(scan, queue, cill_linkage) {
- if (cl_lock_descr_match(&scan->cill_descr, need))
- RETURN(+1);
- }
- RETURN(0);
+ if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
+ d0->cld_mode = CLM_WRITE;
+
+ if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
+ d0->cld_mode = CLM_GROUP;
}
-EXPORT_SYMBOL(cl_queue_match);
-static int cl_queue_merge(const struct list_head *queue,
- const struct cl_lock_descr *need)
+static int cl_lockset_merge(const struct cl_lockset *set,
+ const struct cl_lock_descr *need)
{
struct cl_io_lock_link *scan;
- ENTRY;
- list_for_each_entry(scan, queue, cill_linkage) {
- if (cl_lock_descr_cmp(&scan->cill_descr, need))
+ ENTRY;
+ list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
+ if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
continue;
+
+ /* Merge locks for the same object because ldlm lock server
+ * may expand the lock extent, otherwise there is a deadlock
+ * case if two conflicted locks are queueud for the same object
+ * and lock server expands one lock to overlap the another.
+ * The side effect is that it can generate a multi-stripe lock
+ * that may cause casacading problem */
cl_lock_descr_merge(&scan->cill_descr, need);
CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
RETURN(0);
}
-static int cl_lockset_match(const struct cl_lockset *set,
- const struct cl_lock_descr *need)
-{
- return cl_queue_match(&set->cls_curr, need) ||
- cl_queue_match(&set->cls_done, need);
-}
-
-static int cl_lockset_merge(const struct cl_lockset *set,
- const struct cl_lock_descr *need)
-{
- return cl_queue_merge(&set->cls_todo, need) ||
- cl_lockset_match(set, need);
-}
-
-static int cl_lockset_lock_one(const struct lu_env *env,
- struct cl_io *io, struct cl_lockset *set,
- struct cl_io_lock_link *link)
-{
- struct cl_lock *lock;
- int result;
-
- ENTRY;
-
- lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
-
- if (!IS_ERR(lock)) {
- link->cill_lock = lock;
- list_move(&link->cill_linkage, &set->cls_curr);
- if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
- result = cl_wait(env, lock);
- if (result == 0)
- list_move(&link->cill_linkage, &set->cls_done);
- } else
- result = 0;
- } else
- result = PTR_ERR(lock);
- RETURN(result);
-}
-
-static void cl_lock_link_fini(const struct lu_env *env, struct cl_io *io,
- struct cl_io_lock_link *link)
-{
- struct cl_lock *lock = link->cill_lock;
-
- ENTRY;
- list_del_init(&link->cill_linkage);
- if (lock != NULL) {
- cl_lock_release(env, lock, "io", io);
- link->cill_lock = NULL;
- }
- if (link->cill_fini != NULL)
- link->cill_fini(env, link);
- EXIT;
-}
-
static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
- struct cl_lockset *set)
+ struct cl_lockset *set)
{
- struct cl_io_lock_link *link;
- struct cl_io_lock_link *temp;
- struct cl_lock *lock;
+ struct cl_io_lock_link *link;
+ struct cl_io_lock_link *temp;
int result;
ENTRY;
result = 0;
list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
- if (!cl_lockset_match(set, &link->cill_descr)) {
- /* XXX some locking to guarantee that locks aren't
- * expanded in between. */
- result = cl_lockset_lock_one(env, io, set, link);
- if (result != 0)
- break;
- } else
- cl_lock_link_fini(env, io, link);
- }
- if (result == 0) {
- list_for_each_entry_safe(link, temp, &set->cls_curr,
- cill_linkage) {
- lock = link->cill_lock;
- result = cl_wait(env, lock);
- if (result == 0)
- list_move(&link->cill_linkage, &set->cls_done);
- else
- break;
- }
+ result = cl_lock_request(env, io, &link->cill_lock);
+ if (result < 0)
+ break;
+
+ list_move(&link->cill_linkage, &set->cls_done);
}
RETURN(result);
}
ENTRY;
set = &io->ci_lockset;
- list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage)
- cl_lock_link_fini(env, io, link);
-
- list_for_each_entry_safe(link, temp, &set->cls_curr, cill_linkage)
- cl_lock_link_fini(env, io, link);
+ list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
+ list_del_init(&link->cill_linkage);
+ if (link->cill_fini != NULL)
+ link->cill_fini(env, link);
+ }
list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
- cl_unuse(env, link->cill_lock);
- cl_lock_link_fini(env, io, link);
+ list_del_init(&link->cill_linkage);
+ cl_lock_release(env, &link->cill_lock);
+ if (link->cill_fini != NULL)
+ link->cill_fini(env, link);
}
+
cl_io_for_each_reverse(scan, io) {
if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
struct cl_lock_descr *descr)
{
- struct cl_io_lock_link *link;
- int result;
+ struct cl_io_lock_link *link;
+ int result;
- ENTRY;
- OBD_ALLOC_PTR(link);
- if (link != NULL) {
- link->cill_descr = *descr;
- link->cill_fini = cl_free_io_lock_link;
- result = cl_io_lock_add(env, io, link);
- if (result) /* lock match */
- link->cill_fini(env, link);
- } else
- result = -ENOMEM;
+ ENTRY;
+ OBD_ALLOC_PTR(link);
+ if (link != NULL) {
+ link->cill_descr = *descr;
+ link->cill_fini = cl_free_io_lock_link;
+ result = cl_io_lock_add(env, io, link);
+ if (result) /* lock match */
+ link->cill_fini(env, link);
+ } else
+ result = -ENOMEM;
- RETURN(result);
+ RETURN(result);
}
EXPORT_SYMBOL(cl_io_lock_alloc_add);
void (*end)(const struct lu_env *, struct cl_sync_io *))
{
ENTRY;
+ memset(anchor, 0, sizeof(*anchor));
init_waitqueue_head(&anchor->csi_waitq);
atomic_set(&anchor->csi_sync_nr, nr);
atomic_set(&anchor->csi_barrier, nr > 0);