Whamcloud - gitweb
LU-12296 llite: improve ll_dom_lock_cancel 96/40296/3
authorVladimir Saveliev <c17830@cray.com>
Wed, 5 Jun 2019 01:46:42 +0000 (04:46 +0300)
committerOleg Drokin <green@whamcloud.com>
Tue, 3 Nov 2020 03:17:05 +0000 (03:17 +0000)
ll_dom_lock_cancel() should zero kms attribute similar to
mdc_ldlm_blocking_ast0().

In order to avoid code duplication between mdc_ldlm_blocking_ast0()
and ll_dom_lock_cancel() - add new cl_object_operations method -
coo_object_flush() to reach mdc's blocking ast from llite level.

Tests illustrating the issue are added.

Lustre-change: https://review.whamcloud.com/34858
Lustre-commit: 707bab62f5d6c704b30e4ee9e769b5c9f026e1e7

LU-12704 lov: check all entries in lov_flush_composite

Check all layout entries for DOM layout and exit with
-ENODATA if no one exists. Caller consider that as valid
case due to layout change.

Define llo_flush methods for all layouts as required
by lov_dispatch().

Patch cleans up also cl_dom_size field in cl_layout which
was used in previous ll_dom_lock_cancel() implementation

Run lov_flush_composite under down_read lov->lo_type_guard to avoid
race with layout change.

Lustre-change: https://review.whamcloud.com/36368
Lustre-commit: 44460570fd21a91002190c8a0620923125135b52

Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Change-Id: I2b100ead6d420dbf561bc61be973d64dad317214
Reviewed-on: https://review.whamcloud.com/40296
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/llite/namei.c
lustre/lov/lov_object.c
lustre/mdc/mdc_dev.c
lustre/obdclass/cl_object.c
lustre/tests/sanity-dom.sh

index 4192161..f0c8a5b 100644 (file)
@@ -290,8 +290,6 @@ struct cl_layout {
        struct lu_buf   cl_buf;
        /** size of layout in lov_mds_md format. */
        size_t          cl_size;
-       /** size of DoM component if exists or zero otherwise */
-       u64             cl_dom_comp_size;
        /** Layout generation. */
        u32             cl_layout_gen;
        /** whether layout is a composite one */
@@ -418,6 +416,13 @@ struct cl_object_operations {
        void (*coo_req_attr_set)(const struct lu_env *env,
                                 struct cl_object *obj,
                                 struct cl_req_attr *attr);
+       /**
+        * Flush \a obj data corresponding to \a lock. Used for DoM
+        * locks in llite's cancelling blocking ast callback.
+        */
+       int (*coo_object_flush)(const struct lu_env *env,
+                               struct cl_object *obj,
+                               struct ldlm_lock *lock);
 };
 
 /**
@@ -2108,6 +2113,9 @@ int cl_object_fiemap(const struct lu_env *env, struct cl_object *obj,
 int cl_object_layout_get(const struct lu_env *env, struct cl_object *obj,
                         struct cl_layout *cl);
 loff_t cl_object_maxbytes(struct cl_object *obj);
+int cl_object_flush(const struct lu_env *env, struct cl_object *obj,
+                   struct ldlm_lock *lock);
+
 
 /**
  * Returns true, iff \a o0 and \a o1 are slices of the same object.
index a076c65..ce14962 100644 (file)
@@ -184,15 +184,12 @@ int ll_test_inode_by_fid(struct inode *inode, void *opaque)
        return lu_fid_eq(&ll_i2info(inode)->lli_fid, opaque);
 }
 
-int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
+static int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
 {
        struct lu_env *env;
        struct ll_inode_info *lli = ll_i2info(inode);
-       struct cl_layout clt = { .cl_layout_gen = 0, };
-       int rc;
        __u16 refcheck;
-
-
+       int rc;
        ENTRY;
 
        if (!lli->lli_clob) {
@@ -206,28 +203,20 @@ int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
        if (IS_ERR(env))
                RETURN(PTR_ERR(env));
 
-       rc = cl_object_layout_get(env, lli->lli_clob, &clt);
-       if (rc) {
-               CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
+       /* reach MDC layer to flush data under  the DoM ldlm lock */
+       rc = cl_object_flush(env, lli->lli_clob, lock);
+       if (rc == -ENODATA) {
+               CDEBUG(D_INODE, "inode "DFID" layout has no DoM stripe\n",
                       PFID(ll_inode2fid(inode)));
-               rc = -ENODATA;
-       } else if (clt.cl_size == 0 || clt.cl_dom_comp_size == 0) {
-               CDEBUG(D_INODE, "DOM lock without DOM layout for "DFID"\n",
-                      PFID(ll_inode2fid(inode)));
-       } else {
-               enum cl_fsync_mode mode;
-               loff_t end = clt.cl_dom_comp_size - 1;
-
-               mode = ldlm_is_discard_data(lock) ?
-                                       CL_FSYNC_DISCARD : CL_FSYNC_LOCAL;
-               rc = cl_sync_file_range(inode, 0, end, mode, 1);
-               truncate_inode_pages_range(inode->i_mapping, 0, end);
+               /* most likely result of layout change, do nothing */
+               rc = 0;
        }
+
        cl_env_put(env, &refcheck);
        RETURN(rc);
 }
 
-void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
+static void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
 {
        struct inode *inode = ll_inode_from_resource_lock(lock);
        __u64 bits = to_cancel;
index b0568d7..f19f57b 100644 (file)
@@ -76,6 +76,8 @@ struct lov_layout_operations {
                             struct cl_object *obj, struct cl_io *io);
         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
                             struct cl_attr *attr);
+       int  (*llo_flush)(const struct lu_env *env, struct cl_object *obj,
+                         struct ldlm_lock *lock);
 };
 
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
@@ -1003,6 +1005,32 @@ static int lov_attr_get_composite(const struct lu_env *env,
        RETURN(0);
 }
 
+static int lov_flush_composite(const struct lu_env *env,
+                              struct cl_object *obj,
+                              struct ldlm_lock *lock)
+{
+       struct lov_object *lov = cl2lov(obj);
+       struct lov_layout_entry *lle;
+       int rc = -ENODATA;
+
+       ENTRY;
+
+       lov_foreach_layout_entry(lov, lle) {
+               if (!lsme_is_dom(lle->lle_lsme))
+                       continue;
+               rc = cl_object_flush(env, lovsub2cl(lle->lle_dom.lo_dom), lock);
+               break;
+       }
+
+       RETURN(rc);
+}
+
+static int lov_flush_empty(const struct lu_env *env, struct cl_object *obj,
+                          struct ldlm_lock *lock)
+{
+       return 0;
+}
+
 const static struct lov_layout_operations lov_dispatch[] = {
        [LLT_EMPTY] = {
                .llo_init      = lov_init_empty,
@@ -1013,6 +1041,7 @@ const static struct lov_layout_operations lov_dispatch[] = {
                .llo_lock_init = lov_lock_init_empty,
                .llo_io_init   = lov_io_init_empty,
                .llo_getattr   = lov_attr_get_empty,
+               .llo_flush     = lov_flush_empty,
        },
        [LLT_RELEASED] = {
                .llo_init      = lov_init_released,
@@ -1023,6 +1052,7 @@ const static struct lov_layout_operations lov_dispatch[] = {
                .llo_lock_init = lov_lock_init_empty,
                .llo_io_init   = lov_io_init_released,
                .llo_getattr   = lov_attr_get_empty,
+               .llo_flush     = lov_flush_empty,
        },
        [LLT_COMP] = {
                .llo_init      = lov_init_composite,
@@ -1033,6 +1063,7 @@ const static struct lov_layout_operations lov_dispatch[] = {
                .llo_lock_init = lov_lock_init_composite,
                .llo_io_init   = lov_io_init_composite,
                .llo_getattr   = lov_attr_get_composite,
+               .llo_flush     = lov_flush_composite,
        },
 };
 
@@ -1998,17 +2029,7 @@ static int lov_object_layout_get(const struct lu_env *env,
 
        cl->cl_size = lov_comp_md_size(lsm);
        cl->cl_layout_gen = lsm->lsm_layout_gen;
-       cl->cl_dom_comp_size = 0;
-       if (lsm_is_composite(lsm->lsm_magic)) {
-               struct lov_stripe_md_entry *lsme = lsm->lsm_entries[0];
-
-               cl->cl_is_composite = true;
-
-               if (lsme_is_dom(lsme))
-                       cl->cl_dom_comp_size = lsme->lsme_extent.e_end;
-       } else {
-               cl->cl_is_composite = false;
-       }
+       cl->cl_is_composite = lsm_is_composite(lsm->lsm_magic);
 
        rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
        lov_lsm_put(lsm);
@@ -2032,6 +2053,13 @@ static loff_t lov_object_maxbytes(struct cl_object *obj)
        return maxbytes;
 }
 
+static int lov_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_flush, true, env, obj,
+                                    lock);
+}
+
 static const struct cl_object_operations lov_ops = {
        .coo_page_init    = lov_page_init,
        .coo_lock_init    = lov_lock_init,
@@ -2043,6 +2071,7 @@ static const struct cl_object_operations lov_ops = {
        .coo_layout_get   = lov_object_layout_get,
        .coo_maxbytes     = lov_object_maxbytes,
        .coo_fiemap       = lov_object_fiemap,
+       .coo_object_flush = lov_object_flush
 };
 
 static const struct lu_object_operations lov_lu_obj_ops = {
index 19e850d..726165c 100644 (file)
@@ -291,7 +291,7 @@ void mdc_lock_lockless_cancel(const struct lu_env *env,
  */
 static int mdc_dlm_blocking_ast0(const struct lu_env *env,
                                 struct ldlm_lock *dlmlock,
-                                void *data, int flag)
+                                int flag)
 {
        struct cl_object *obj = NULL;
        int result = 0;
@@ -383,7 +383,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
                        break;
                }
 
-               rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag);
+               rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
                cl_env_put(env, &refcheck);
                break;
        }
@@ -1402,6 +1402,12 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj)
        return 0;
 }
 
+static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+}
+
 static const struct cl_object_operations mdc_ops = {
        .coo_page_init = osc_page_init,
        .coo_lock_init = mdc_lock_init,
@@ -1411,6 +1417,7 @@ static const struct cl_object_operations mdc_ops = {
        .coo_glimpse = osc_object_glimpse,
        .coo_req_attr_set = mdc_req_attr_set,
        .coo_prune = mdc_object_prune,
+       .coo_object_flush = mdc_object_flush
 };
 
 static const struct osc_object_operations mdc_object_ops = {
index 2914017..5aa59de 100644 (file)
@@ -422,6 +422,24 @@ loff_t cl_object_maxbytes(struct cl_object *obj)
 }
 EXPORT_SYMBOL(cl_object_maxbytes);
 
+int cl_object_flush(const struct lu_env *env, struct cl_object *obj,
+                        struct ldlm_lock *lock)
+{
+       struct lu_object_header *top = obj->co_lu.lo_header;
+       int rc = 0;
+       ENTRY;
+
+       list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
+               if (obj->co_ops->coo_object_flush) {
+                       rc = obj->co_ops->coo_object_flush(env, obj, lock);
+                       if (rc)
+                               break;
+               }
+       }
+       RETURN(rc);
+}
+EXPORT_SYMBOL(cl_object_flush);
+
 /**
  * Helper function removing all object locks, and marking object for
  * deletion. All object pages must have been deleted at this point.
index b1c895a..3df0133 100644 (file)
@@ -155,6 +155,33 @@ test_5() {
 }
 run_test 5 "DoM truncate deadlock"
 
+test_6() {
+       $MULTIOP $DIR1/$tfile Oz40960w100_z200w100c &
+       MULTIPID=$!
+
+       # let MULTIPID to create the file
+       sleep 1
+       $MULTIOP $DIR2/$tfile oO_RDWR:Tw100c
+       kill -USR1 $MULTIPID
+       wait
+       $MULTIOP $DIR2/$tfile oO_RDWR:z400w100c
+       $CHECKSTAT -s 500 $DIR2/$tfile || error "wrong size"
+}
+run_test 6 "Race two writes, check file size"
+
+test_7() {
+       dd if=/dev/zero of=$DIR1/$tfile bs=1k count=1 || error "write 10k"
+       $TRUNCATE $DIR1/$tfile 123456 || error "truncate up to 123456"
+       $TRUNCATE $DIR2/$tfile 55555 || error "truncate down to 55555"
+       $CHECKSTAT -t file -s 55555 $DIR1/$tfile || error "stat"
+       $CHECKSTAT -t file -s 55555 $DIR2/$tfile || error "stat"
+       dd if=/dev/zero of=$DIR1/$tfile bs=10k count=1 seek=10 conv=notrunc || error "write 10k@1M"
+       $CHECKSTAT -t file -s 112640 $DIR1/$tfile || error "stat"
+       $CHECKSTAT -t file -s 112640 $DIR2/$tfile || error "stat"
+       rm $DIR1/$tfile
+}
+run_test 7 "truncate up, truncate down on another mount, write in between"
+
 test_fsx() {
        local file1=$DIR1/$tfile
        local file2=$DIR2/$tfile