Whamcloud - gitweb
LU-3259 clio: cl_lock simplification
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
index a9945a9..94eae55 100644 (file)
@@ -163,7 +163,6 @@ static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
 
         io->ci_type = iot;
        INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
-       INIT_LIST_HEAD(&io->ci_lockset.cls_curr);
        INIT_LIST_HEAD(&io->ci_lockset.cls_done);
        INIT_LIST_HEAD(&io->ci_layers);
 
@@ -242,45 +241,11 @@ int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
 }
 EXPORT_SYMBOL(cl_io_rw_init);
 
-static inline const struct lu_fid *
-cl_lock_descr_fid(const struct cl_lock_descr *descr)
-{
-        return lu_object_fid(&descr->cld_obj->co_lu);
-}
-
 static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
                               const struct cl_lock_descr *d1)
 {
-        return lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1)) ?:
-                __diff_normalize(d0->cld_start, d1->cld_start);
-}
-
-static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
-                             const struct cl_lock_descr *d1)
-{
-        int ret;
-
-        ret = lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1));
-        if (ret)
-                return ret;
-        if (d0->cld_end < d1->cld_start)
-                return -1;
-        if (d0->cld_start > d0->cld_end)
-                return 1;
-        return 0;
-}
-
-static void cl_lock_descr_merge(struct cl_lock_descr *d0,
-                                const struct cl_lock_descr *d1)
-{
-        d0->cld_start = min(d0->cld_start, d1->cld_start);
-        d0->cld_end = max(d0->cld_end, d1->cld_end);
-
-        if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
-                d0->cld_mode = CLM_WRITE;
-
-        if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
-                d0->cld_mode = CLM_GROUP;
+       return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
+                         lu_object_fid(&d1->cld_obj->co_lu));
 }
 
 /*
@@ -329,35 +294,35 @@ static void cl_io_locks_sort(struct cl_io *io)
        EXIT;
 }
 
-/**
- * Check whether \a queue contains locks matching \a need.
- *
- * \retval +ve there is a matching lock in the \a queue
- * \retval   0 there are no matching locks in the \a queue
- */
-int cl_queue_match(const struct list_head *queue,
-                   const struct cl_lock_descr *need)
+static void cl_lock_descr_merge(struct cl_lock_descr *d0,
+                               const struct cl_lock_descr *d1)
 {
-       struct cl_io_lock_link *scan;
-       ENTRY;
+       d0->cld_start = min(d0->cld_start, d1->cld_start);
+       d0->cld_end = max(d0->cld_end, d1->cld_end);
 
-       list_for_each_entry(scan, queue, cill_linkage) {
-               if (cl_lock_descr_match(&scan->cill_descr, need))
-                       RETURN(+1);
-       }
-       RETURN(0);
+       if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
+               d0->cld_mode = CLM_WRITE;
+
+       if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
+               d0->cld_mode = CLM_GROUP;
 }
-EXPORT_SYMBOL(cl_queue_match);
 
-static int cl_queue_merge(const struct list_head *queue,
-                          const struct cl_lock_descr *need)
+static int cl_lockset_merge(const struct cl_lockset *set,
+                           const struct cl_lock_descr *need)
 {
        struct cl_io_lock_link *scan;
-       ENTRY;
 
-       list_for_each_entry(scan, queue, cill_linkage) {
-               if (cl_lock_descr_cmp(&scan->cill_descr, need))
+       ENTRY;
+       list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
+               if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
                        continue;
+
+               /* Merge locks for the same object because ldlm lock server
+                * may expand the lock extent, otherwise there is a deadlock
+                * case if two conflicted locks are queueud for the same object
+                * and lock server expands one lock to overlap the another.
+                * The side effect is that it can generate a multi-stripe lock
+                * that may cause casacading problem */
                cl_lock_descr_merge(&scan->cill_descr, need);
                CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
                       scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
@@ -367,91 +332,21 @@ static int cl_queue_merge(const struct list_head *queue,
        RETURN(0);
 }
 
-static int cl_lockset_match(const struct cl_lockset *set,
-                            const struct cl_lock_descr *need)
-{
-        return cl_queue_match(&set->cls_curr, need) ||
-               cl_queue_match(&set->cls_done, need);
-}
-
-static int cl_lockset_merge(const struct cl_lockset *set,
-                            const struct cl_lock_descr *need)
-{
-        return cl_queue_merge(&set->cls_todo, need) ||
-               cl_lockset_match(set, need);
-}
-
-static int cl_lockset_lock_one(const struct lu_env *env,
-                               struct cl_io *io, struct cl_lockset *set,
-                               struct cl_io_lock_link *link)
-{
-        struct cl_lock *lock;
-        int             result;
-
-        ENTRY;
-
-       lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
-
-        if (!IS_ERR(lock)) {
-                link->cill_lock = lock;
-               list_move(&link->cill_linkage, &set->cls_curr);
-                if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
-                        result = cl_wait(env, lock);
-                        if (result == 0)
-                               list_move(&link->cill_linkage, &set->cls_done);
-                } else
-                        result = 0;
-        } else
-                result = PTR_ERR(lock);
-        RETURN(result);
-}
-
-static void cl_lock_link_fini(const struct lu_env *env, struct cl_io *io,
-                              struct cl_io_lock_link *link)
-{
-        struct cl_lock *lock = link->cill_lock;
-
-        ENTRY;
-       list_del_init(&link->cill_linkage);
-        if (lock != NULL) {
-                cl_lock_release(env, lock, "io", io);
-                link->cill_lock = NULL;
-        }
-        if (link->cill_fini != NULL)
-                link->cill_fini(env, link);
-        EXIT;
-}
-
 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
-                           struct cl_lockset *set)
+                          struct cl_lockset *set)
 {
-       struct cl_io_lock_link  *link;
-       struct cl_io_lock_link  *temp;
-       struct cl_lock          *lock;
+       struct cl_io_lock_link *link;
+       struct cl_io_lock_link *temp;
        int result;
 
        ENTRY;
        result = 0;
        list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
-               if (!cl_lockset_match(set, &link->cill_descr)) {
-                       /* XXX some locking to guarantee that locks aren't
-                        * expanded in between. */
-                       result = cl_lockset_lock_one(env, io, set, link);
-                       if (result != 0)
-                               break;
-               } else
-                       cl_lock_link_fini(env, io, link);
-       }
-       if (result == 0) {
-               list_for_each_entry_safe(link, temp, &set->cls_curr,
-                                        cill_linkage) {
-                       lock = link->cill_lock;
-                       result = cl_wait(env, lock);
-                       if (result == 0)
-                               list_move(&link->cill_linkage, &set->cls_done);
-                       else
-                               break;
-               }
+               result = cl_lock_request(env, io, &link->cill_lock);
+               if (result < 0)
+                       break;
+
+               list_move(&link->cill_linkage, &set->cls_done);
        }
        RETURN(result);
 }
@@ -509,16 +404,19 @@ void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
         ENTRY;
         set = &io->ci_lockset;
 
-       list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage)
-               cl_lock_link_fini(env, io, link);
-
-       list_for_each_entry_safe(link, temp, &set->cls_curr, cill_linkage)
-               cl_lock_link_fini(env, io, link);
+       list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
+               list_del_init(&link->cill_linkage);
+               if (link->cill_fini != NULL)
+                       link->cill_fini(env, link);
+       }
 
        list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
-               cl_unuse(env, link->cill_lock);
-               cl_lock_link_fini(env, io, link);
+               list_del_init(&link->cill_linkage);
+               cl_lock_release(env, &link->cill_lock);
+               if (link->cill_fini != NULL)
+                       link->cill_fini(env, link);
        }
+
        cl_io_for_each_reverse(scan, io) {
                if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
                        scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
@@ -642,21 +540,21 @@ static void cl_free_io_lock_link(const struct lu_env *env,
 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
                          struct cl_lock_descr *descr)
 {
-        struct cl_io_lock_link *link;
-        int result;
+       struct cl_io_lock_link *link;
+       int result;
 
-        ENTRY;
-        OBD_ALLOC_PTR(link);
-        if (link != NULL) {
-                link->cill_descr     = *descr;
-                link->cill_fini      = cl_free_io_lock_link;
-                result = cl_io_lock_add(env, io, link);
-                if (result) /* lock match */
-                        link->cill_fini(env, link);
-        } else
-                result = -ENOMEM;
+       ENTRY;
+       OBD_ALLOC_PTR(link);
+       if (link != NULL) {
+               link->cill_descr = *descr;
+               link->cill_fini  = cl_free_io_lock_link;
+               result = cl_io_lock_add(env, io, link);
+               if (result) /* lock match */
+                       link->cill_fini(env, link);
+       } else
+               result = -ENOMEM;
 
-        RETURN(result);
+       RETURN(result);
 }
 EXPORT_SYMBOL(cl_io_lock_alloc_add);
 
@@ -1577,6 +1475,7 @@ void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
                     void (*end)(const struct lu_env *, struct cl_sync_io *))
 {
        ENTRY;
+       memset(anchor, 0, sizeof(*anchor));
        init_waitqueue_head(&anchor->csi_waitq);
        atomic_set(&anchor->csi_sync_nr, nr);
        atomic_set(&anchor->csi_barrier, nr > 0);