ll_dom_lock_cancel() should zero kms attribute similar to
mdc_ldlm_blocking_ast0().
In order to avoid code duplication between mdc_ldlm_blocking_ast0()
and ll_dom_lock_cancel() - add new cl_object_operations method -
coo_object_flush() to reach mdc's blocking ast from llite level.
Tests illustrating the issue are added.
Lustre-change: https://review.whamcloud.com/34858
Lustre-commit:
707bab62f5d6c704b30e4ee9e769b5c9f026e1e7
LU-12704 lov: check all entries in lov_flush_composite
Check all layout entries for DOM layout and exit with
-ENODATA if no one exists. Caller consider that as valid
case due to layout change.
Define llo_flush methods for all layouts as required
by lov_dispatch().
Patch cleans up also cl_dom_size field in cl_layout which
was used in previous ll_dom_lock_cancel() implementation
Run lov_flush_composite under down_read lov->lo_type_guard to avoid
race with layout change.
Lustre-change: https://review.whamcloud.com/36368
Lustre-commit:
44460570fd21a91002190c8a0620923125135b52
Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Change-Id: I2b100ead6d420dbf561bc61be973d64dad317214
Reviewed-on: https://review.whamcloud.com/40296
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
struct lu_buf cl_buf;
/** size of layout in lov_mds_md format. */
size_t cl_size;
- /** size of DoM component if exists or zero otherwise */
- u64 cl_dom_comp_size;
/** Layout generation. */
u32 cl_layout_gen;
/** whether layout is a composite one */
void (*coo_req_attr_set)(const struct lu_env *env,
struct cl_object *obj,
struct cl_req_attr *attr);
+ /**
+ * Flush \a obj data corresponding to \a lock. Used for DoM
+ * locks in llite's cancelling blocking ast callback.
+ */
+ int (*coo_object_flush)(const struct lu_env *env,
+ struct cl_object *obj,
+ struct ldlm_lock *lock);
};
/**
int cl_object_layout_get(const struct lu_env *env, struct cl_object *obj,
struct cl_layout *cl);
loff_t cl_object_maxbytes(struct cl_object *obj);
+int cl_object_flush(const struct lu_env *env, struct cl_object *obj,
+ struct ldlm_lock *lock);
+
/**
* Returns true, iff \a o0 and \a o1 are slices of the same object.
return lu_fid_eq(&ll_i2info(inode)->lli_fid, opaque);
}
-int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
+static int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
{
struct lu_env *env;
struct ll_inode_info *lli = ll_i2info(inode);
- struct cl_layout clt = { .cl_layout_gen = 0, };
- int rc;
__u16 refcheck;
-
-
+ int rc;
ENTRY;
if (!lli->lli_clob) {
if (IS_ERR(env))
RETURN(PTR_ERR(env));
- rc = cl_object_layout_get(env, lli->lli_clob, &clt);
- if (rc) {
- CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
+ /* reach MDC layer to flush data under the DoM ldlm lock */
+ rc = cl_object_flush(env, lli->lli_clob, lock);
+ if (rc == -ENODATA) {
+ CDEBUG(D_INODE, "inode "DFID" layout has no DoM stripe\n",
PFID(ll_inode2fid(inode)));
- rc = -ENODATA;
- } else if (clt.cl_size == 0 || clt.cl_dom_comp_size == 0) {
- CDEBUG(D_INODE, "DOM lock without DOM layout for "DFID"\n",
- PFID(ll_inode2fid(inode)));
- } else {
- enum cl_fsync_mode mode;
- loff_t end = clt.cl_dom_comp_size - 1;
-
- mode = ldlm_is_discard_data(lock) ?
- CL_FSYNC_DISCARD : CL_FSYNC_LOCAL;
- rc = cl_sync_file_range(inode, 0, end, mode, 1);
- truncate_inode_pages_range(inode->i_mapping, 0, end);
+ /* most likely result of layout change, do nothing */
+ rc = 0;
}
+
cl_env_put(env, &refcheck);
RETURN(rc);
}
-void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
+static void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
{
struct inode *inode = ll_inode_from_resource_lock(lock);
__u64 bits = to_cancel;
struct cl_object *obj, struct cl_io *io);
int (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
struct cl_attr *attr);
+ int (*llo_flush)(const struct lu_env *env, struct cl_object *obj,
+ struct ldlm_lock *lock);
};
static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
RETURN(0);
}
+static int lov_flush_composite(const struct lu_env *env,
+ struct cl_object *obj,
+ struct ldlm_lock *lock)
+{
+ struct lov_object *lov = cl2lov(obj);
+ struct lov_layout_entry *lle;
+ int rc = -ENODATA;
+
+ ENTRY;
+
+ lov_foreach_layout_entry(lov, lle) {
+ if (!lsme_is_dom(lle->lle_lsme))
+ continue;
+ rc = cl_object_flush(env, lovsub2cl(lle->lle_dom.lo_dom), lock);
+ break;
+ }
+
+ RETURN(rc);
+}
+
+static int lov_flush_empty(const struct lu_env *env, struct cl_object *obj,
+ struct ldlm_lock *lock)
+{
+ return 0;
+}
+
const static struct lov_layout_operations lov_dispatch[] = {
[LLT_EMPTY] = {
.llo_init = lov_init_empty,
.llo_lock_init = lov_lock_init_empty,
.llo_io_init = lov_io_init_empty,
.llo_getattr = lov_attr_get_empty,
+ .llo_flush = lov_flush_empty,
},
[LLT_RELEASED] = {
.llo_init = lov_init_released,
.llo_lock_init = lov_lock_init_empty,
.llo_io_init = lov_io_init_released,
.llo_getattr = lov_attr_get_empty,
+ .llo_flush = lov_flush_empty,
},
[LLT_COMP] = {
.llo_init = lov_init_composite,
.llo_lock_init = lov_lock_init_composite,
.llo_io_init = lov_io_init_composite,
.llo_getattr = lov_attr_get_composite,
+ .llo_flush = lov_flush_composite,
},
};
cl->cl_size = lov_comp_md_size(lsm);
cl->cl_layout_gen = lsm->lsm_layout_gen;
- cl->cl_dom_comp_size = 0;
- if (lsm_is_composite(lsm->lsm_magic)) {
- struct lov_stripe_md_entry *lsme = lsm->lsm_entries[0];
-
- cl->cl_is_composite = true;
-
- if (lsme_is_dom(lsme))
- cl->cl_dom_comp_size = lsme->lsme_extent.e_end;
- } else {
- cl->cl_is_composite = false;
- }
+ cl->cl_is_composite = lsm_is_composite(lsm->lsm_magic);
rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
lov_lsm_put(lsm);
return maxbytes;
}
+static int lov_object_flush(const struct lu_env *env, struct cl_object *obj,
+ struct ldlm_lock *lock)
+{
+ return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_flush, true, env, obj,
+ lock);
+}
+
static const struct cl_object_operations lov_ops = {
.coo_page_init = lov_page_init,
.coo_lock_init = lov_lock_init,
.coo_layout_get = lov_object_layout_get,
.coo_maxbytes = lov_object_maxbytes,
.coo_fiemap = lov_object_fiemap,
+ .coo_object_flush = lov_object_flush
};
static const struct lu_object_operations lov_lu_obj_ops = {
*/
static int mdc_dlm_blocking_ast0(const struct lu_env *env,
struct ldlm_lock *dlmlock,
- void *data, int flag)
+ int flag)
{
struct cl_object *obj = NULL;
int result = 0;
break;
}
- rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag);
+ rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
cl_env_put(env, &refcheck);
break;
}
return 0;
}
+static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
+ struct ldlm_lock *lock)
+{
+ RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+}
+
static const struct cl_object_operations mdc_ops = {
.coo_page_init = osc_page_init,
.coo_lock_init = mdc_lock_init,
.coo_glimpse = osc_object_glimpse,
.coo_req_attr_set = mdc_req_attr_set,
.coo_prune = mdc_object_prune,
+ .coo_object_flush = mdc_object_flush
};
static const struct osc_object_operations mdc_object_ops = {
}
EXPORT_SYMBOL(cl_object_maxbytes);
+int cl_object_flush(const struct lu_env *env, struct cl_object *obj,
+ struct ldlm_lock *lock)
+{
+ struct lu_object_header *top = obj->co_lu.lo_header;
+ int rc = 0;
+ ENTRY;
+
+ list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
+ if (obj->co_ops->coo_object_flush) {
+ rc = obj->co_ops->coo_object_flush(env, obj, lock);
+ if (rc)
+ break;
+ }
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(cl_object_flush);
+
/**
* Helper function removing all object locks, and marking object for
* deletion. All object pages must have been deleted at this point.
}
run_test 5 "DoM truncate deadlock"
+test_6() {
+ $MULTIOP $DIR1/$tfile Oz40960w100_z200w100c &
+ MULTIPID=$!
+
+ # let MULTIPID to create the file
+ sleep 1
+ $MULTIOP $DIR2/$tfile oO_RDWR:Tw100c
+ kill -USR1 $MULTIPID
+ wait
+ $MULTIOP $DIR2/$tfile oO_RDWR:z400w100c
+ $CHECKSTAT -s 500 $DIR2/$tfile || error "wrong size"
+}
+run_test 6 "Race two writes, check file size"
+
+test_7() {
+ dd if=/dev/zero of=$DIR1/$tfile bs=1k count=1 || error "write 10k"
+ $TRUNCATE $DIR1/$tfile 123456 || error "truncate up to 123456"
+ $TRUNCATE $DIR2/$tfile 55555 || error "truncate down to 55555"
+ $CHECKSTAT -t file -s 55555 $DIR1/$tfile || error "stat"
+ $CHECKSTAT -t file -s 55555 $DIR2/$tfile || error "stat"
+ dd if=/dev/zero of=$DIR1/$tfile bs=10k count=1 seek=10 conv=notrunc || error "write 10k@1M"
+ $CHECKSTAT -t file -s 112640 $DIR1/$tfile || error "stat"
+ $CHECKSTAT -t file -s 112640 $DIR2/$tfile || error "stat"
+ rm $DIR1/$tfile
+}
+run_test 7 "truncate up, truncate down on another mount, write in between"
+
test_fsx() {
local file1=$DIR1/$tfile
local file2=$DIR2/$tfile