From: Jinshan Xiong Date: Mon, 21 May 2012 19:12:59 +0000 (-0700) Subject: LU-1030 clio: reimplement ll_fsync in clio way X-Git-Tag: 2.2.55~38 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=2d337a3da6bee76d66450fedda8386f9ede035fa;hp=364c4cdb58ed75996017d35dcbbc7e7acc6d076e LU-1030 clio: reimplement ll_fsync in clio way A new clio operation CIT_FSYNC is added to forcibly synchorize data on local client and OST. Signed-off-by: Jinshan Xiong Change-Id: I047471f819c00901907d4312a353095b44dd96a1 Reviewed-on: http://review.whamcloud.com/2460 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Johann Lombardi Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index b88655b..c0f014d 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1901,6 +1901,11 @@ enum cl_io_type { */ CIT_FAULT, /** + * fsync system call handling + * To write out a range of file + */ + CIT_FSYNC, + /** * Miscellaneous io. This is used for occasional io activity that * doesn't fit into other types. Currently this is used for: * @@ -2299,6 +2304,13 @@ struct cl_io { /** resulting page */ struct cl_page *ft_page; } ci_fault; + struct cl_fsync_io { + loff_t fi_start; + loff_t fi_end; + struct obd_capa *fi_capa; + /** file system level fid */ + struct lu_fid *fi_fid; + } ci_fsync; } u; struct cl_2queue ci_queue; size_t ci_nob; diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index e96a1cb..17d9c03 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -69,6 +69,12 @@ struct osc_setattr_args { void *sa_cookie; }; +struct osc_fsync_args { + struct obd_info *fa_oi; + obd_enqueue_update_f fa_upcall; + void *fa_cookie; +}; + struct osc_enqueue_args { struct obd_export *oa_exp; int *oa_flags; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 35b1e1a..5498217 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1982,6 +1982,48 @@ int ll_flush(struct file *file) return rc ? -EIO : 0; } +/** + * Called to make sure a portion of file has been written out. + * if @local_only is not true, it will send OST_SYNC RPCs to ost. + */ +int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end) +{ + struct cl_env_nest nest; + struct lu_env *env; + struct cl_io *io; + struct obd_capa *capa = NULL; + struct cl_fsync_io *fio; + int result; + ENTRY; + + env = cl_env_nested_get(&nest); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE); + + io = ccc_env_thread_io(env); + io->ci_obj = cl_i2info(inode)->lli_clob; + + /* initialize parameters for sync */ + fio = &io->u.ci_fsync; + fio->fi_capa = capa; + fio->fi_start = start; + fio->fi_end = end; + fio->fi_fid = ll_inode2fid(inode); + + if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0) + result = cl_io_loop(env, io); + else + result = io->ci_result; + cl_io_fini(env, io); + cl_env_nested_put(&nest, env); + + capa_put(capa); + + RETURN(result); +} + #ifdef HAVE_FILE_FSYNC_4ARGS int ll_fsync(struct file *file, loff_t start, loff_t end, int data) #elif defined(HAVE_FILE_FSYNC_2ARGS) @@ -2029,33 +2071,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) ptlrpc_req_finished(req); if (data && lsm) { - struct obd_info *oinfo; - - OBD_ALLOC_PTR(oinfo); - if (!oinfo) - RETURN(rc ? rc : -ENOMEM); - OBDO_ALLOC(oinfo->oi_oa); - if (!oinfo->oi_oa) { - OBD_FREE_PTR(oinfo); - RETURN(rc ? rc : -ENOMEM); - } - oinfo->oi_oa->o_id = lsm->lsm_object_id; - oinfo->oi_oa->o_seq = lsm->lsm_object_seq; - oinfo->oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; - obdo_from_inode(oinfo->oi_oa, inode, - OBD_MD_FLTYPE | OBD_MD_FLATIME | - OBD_MD_FLMTIME | OBD_MD_FLCTIME | - OBD_MD_FLGROUP); - obdo_set_parent_fid(oinfo->oi_oa, &ll_i2info(inode)->lli_fid); - oinfo->oi_md = lsm; - oinfo->oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE); - err = obd_sync_rqset(ll_i2sbi(inode)->ll_dt_exp, oinfo, 0, - OBD_OBJECT_EOF); - capa_put(oinfo->oi_capa); + err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF); if (!rc) rc = err; - OBDO_FREE(oinfo->oi_oa); - OBD_FREE_PTR(oinfo); lli->lli_write_rc = rc < 0 ? rc : 0; } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 8d0cf47..b03ce4b 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -1402,6 +1402,8 @@ static inline int cl_merge_lvb(struct inode *inode) struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt); +int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end); + /** direct write pages */ struct ll_dio_pages { /** page array to be written. we don't support diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index d45cc26..4a65b9e 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -802,6 +802,13 @@ out: return result; } +static void vvp_io_fsync_end(const struct lu_env *env, + const struct cl_io_slice *ios) +{ + /* never try to verify there is no dirty pages in sync range + * because page_mkwrite() can generate new dirty pages any time. */ +} + static int vvp_io_read_page(const struct lu_env *env, const struct cl_io_slice *ios, const struct cl_page_slice *slice) @@ -1100,6 +1107,10 @@ static const struct cl_io_operations vvp_io_ops = { .cio_start = vvp_io_fault_start, .cio_end = ccc_io_end }, + [CIT_FSYNC] = { + .cio_end = vvp_io_fsync_end, + .cio_fini = vvp_io_fini + }, [CIT_MISC] = { .cio_fini = vvp_io_fini } diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 907c81b..3cf97dd 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -109,6 +109,13 @@ static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio, io->u.ci_fault.ft_index = cl_index(obj, off); break; } + case CIT_FSYNC: { + io->u.ci_fsync.fi_start = start; + io->u.ci_fsync.fi_end = end; + io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa; + io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid; + break; + } case CIT_READ: case CIT_WRITE: { if (cl_io_is_append(parent)) { @@ -331,6 +338,12 @@ static void lov_io_slice_init(struct lov_io *lio, break; } + case CIT_FSYNC: { + lio->lis_pos = io->u.ci_fsync.fi_start; + lio->lis_endpos = io->u.ci_fsync.fi_end; + break; + } + case CIT_MISC: lio->lis_pos = 0; lio->lis_endpos = OBD_OBJECT_EOF; @@ -445,6 +458,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env, static int lov_io_call(const struct lu_env *env, struct lov_io *lio, int (*iofunc)(const struct lu_env *, struct cl_io *)) { + struct cl_io *parent = lio->lis_cl.cis_io; struct lov_io_sub *sub; int rc = 0; @@ -455,6 +469,9 @@ static int lov_io_call(const struct lu_env *env, struct lov_io *lio, lov_sub_exit(sub); if (rc) break; + + if (parent->ci_result == 0) + parent->ci_result = sub->sub_io->ci_result; } RETURN(rc); } @@ -764,6 +781,15 @@ static const struct cl_io_operations lov_io_ops = { .cio_start = lov_io_fault_start, .cio_end = lov_io_end }, + [CIT_FSYNC] = { + .cio_fini = lov_io_fini, + .cio_iter_init = lov_io_iter_init, + .cio_iter_fini = lov_io_iter_fini, + .cio_lock = lov_io_lock, + .cio_unlock = lov_io_unlock, + .cio_start = lov_io_start, + .cio_end = lov_io_end + }, [CIT_MISC] = { .cio_fini = lov_io_fini } @@ -836,6 +862,9 @@ static const struct cl_io_operations lov_empty_io_ops = { .cio_start = LOV_EMPTY_IMPOSSIBLE, .cio_end = LOV_EMPTY_IMPOSSIBLE }, + [CIT_FSYNC] = { + .cio_fini = lov_empty_io_fini + }, [CIT_MISC] = { .cio_fini = lov_empty_io_fini } @@ -879,6 +908,7 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj, switch (io->ci_type) { default: LBUG(); + case CIT_FSYNC: case CIT_MISC: case CIT_READ: result = 0; diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index d90be9c..4ac5087 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -1033,7 +1033,9 @@ int cl_io_loop(const struct lu_env *env, struct cl_io *io) } cl_io_iter_fini(env, io); } while (result == 0 && io->ci_continue); - RETURN(result < 0 ? result : 0); + if (result == 0) + result = io->ci_result; + RETURN(result < 0 ? result : 0); } EXPORT_SYMBOL(cl_io_loop); diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index 57350ea..3094ca2 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -70,11 +70,12 @@ struct osc_io { /** true if this io is lockless. */ int oi_lockless; - struct obdo oi_oa; - struct osc_setattr_cbargs { - int opc_rc; - cfs_completion_t opc_sync; - } oi_setattr_cbarg; + struct obd_info oi_info; + struct obdo oi_oa; + struct osc_async_cbargs { + int opc_rc; + cfs_completion_t opc_sync; + } oi_cbarg; }; /** diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 7293627..3f81c7b 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -136,6 +136,9 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset); +int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, + obd_enqueue_update_f upcall, void *cookie, + struct ptlrpc_request_set *rqset); int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg); int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 32044fa..5818b8d 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -363,9 +363,9 @@ static int osc_io_fault_start(const struct lu_env *env, RETURN(0); } -static int osc_setattr_upcall(void *a, int rc) +static int osc_async_upcall(void *a, int rc) { - struct osc_setattr_cbargs *args = a; + struct osc_async_cbargs *args = a; args->opc_rc = rc; cfs_complete(&args->opc_sync); @@ -441,7 +441,7 @@ static int osc_io_setattr_start(const struct lu_env *env, struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; struct cl_attr *attr = &osc_env_info(env)->oti_attr; struct obdo *oa = &oio->oi_oa; - struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg; + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; loff_t size = io->u.ci_setattr.sa_attr.lvb_size; unsigned int ia_valid = io->u.ci_setattr.sa_valid; int result = 0; @@ -504,12 +504,12 @@ static int osc_io_setattr_start(const struct lu_env *env, if (ia_valid & ATTR_SIZE) result = osc_punch_base(osc_export(cl2osc(obj)), - &oinfo, osc_setattr_upcall, + &oinfo, osc_async_upcall, cbargs, PTLRPCD_SET); else result = osc_setattr_async_base(osc_export(cl2osc(obj)), &oinfo, NULL, - osc_setattr_upcall, + osc_async_upcall, cbargs, PTLRPCD_SET); } return result; @@ -520,7 +520,7 @@ static void osc_io_setattr_end(const struct lu_env *env, { struct cl_io *io = slice->cis_io; struct osc_io *oio = cl2osc_io(env, slice); - struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg; + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; int result; cfs_wait_for_completion(&cbargs->opc_sync); @@ -584,6 +584,52 @@ static int osc_io_write_start(const struct lu_env *env, RETURN(result); } +static int osc_io_fsync_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct cl_io *io = slice->cis_io; + struct osc_io *oio = cl2osc_io(env, slice); + struct obdo *oa = &oio->oi_oa; + struct obd_info *oinfo = &oio->oi_info; + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; + struct cl_object *obj = slice->cis_obj; + struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; + int result = 0; + ENTRY; + + memset(oa, 0, sizeof(*oa)); + oa->o_id = loi->loi_id; + oa->o_seq = loi->loi_seq; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + + /* reload size and blocks for start and end of sync range */ + oa->o_size = io->u.ci_fsync.fi_start; + oa->o_blocks = io->u.ci_fsync.fi_end; + oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + + obdo_set_parent_fid(oa, io->u.ci_fsync.fi_fid); + + memset(oinfo, 0, sizeof(*oinfo)); + oinfo->oi_oa = oa; + oinfo->oi_capa = io->u.ci_fsync.fi_capa; + cfs_init_completion(&cbargs->opc_sync); + + result = osc_sync_base(osc_export(cl2osc(obj)), oinfo, + osc_async_upcall, cbargs, PTLRPCD_SET); + RETURN(result); +} + +static void osc_io_fsync_end(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct cl_io *io = slice->cis_io; + struct osc_io *oio = cl2osc_io(env, slice); + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; + + cfs_wait_for_completion(&cbargs->opc_sync); + io->ci_result = cbargs->opc_rc; +} + static const struct cl_io_operations osc_io_ops = { .op = { [CIT_READ] = { @@ -602,6 +648,11 @@ static const struct cl_io_operations osc_io_ops = { .cio_fini = osc_io_fini, .cio_start = osc_io_fault_start }, + [CIT_FSYNC] = { + .cio_start = osc_io_fsync_start, + .cio_end = osc_io_fsync_end, + .cio_fini = osc_io_fini + }, [CIT_MISC] = { .cio_fini = osc_io_fini } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 0de44b7..3aaf175 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -568,7 +568,7 @@ static int osc_sync_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *arg, int rc) { - struct osc_async_args *aa = arg; + struct osc_fsync_args *fa = arg; struct ost_body *body; ENTRY; @@ -581,27 +581,22 @@ static int osc_sync_interpret(const struct lu_env *env, GOTO(out, rc = -EPROTO); } - *aa->aa_oi->oi_oa = body->oa; + *fa->fa_oi->oi_oa = body->oa; out: - rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); - RETURN(rc); + rc = fa->fa_upcall(fa->fa_cookie, rc); + RETURN(rc); } -static int osc_sync(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo, obd_size start, obd_size end, - struct ptlrpc_request_set *set) +int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, + obd_enqueue_update_f upcall, void *cookie, + struct ptlrpc_request_set *rqset) { - struct ptlrpc_request *req; - struct ost_body *body; - struct osc_async_args *aa; + struct ptlrpc_request *req; + struct ost_body *body; + struct osc_fsync_args *fa; int rc; ENTRY; - if (!oinfo->oi_oa) { - CDEBUG(D_INFO, "oa NULL\n"); - RETURN(-EINVAL); - } - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC); if (req == NULL) RETURN(-ENOMEM); @@ -617,20 +612,41 @@ static int osc_sync(const struct lu_env *env, struct obd_export *exp, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); - body->oa.o_size = start; - body->oa.o_blocks = end; - body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); req->rq_interpret_reply = osc_sync_interpret; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->aa_oi = oinfo; + CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); + fa = ptlrpc_req_async_args(req); + fa->fa_oi = oinfo; + fa->fa_upcall = upcall; + fa->fa_cookie = cookie; - ptlrpc_set_add_req(set, req); - RETURN (0); + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + else + ptlrpc_set_add_req(rqset, req); + + RETURN (0); +} + +static int osc_sync(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, obd_size start, obd_size end, + struct ptlrpc_request_set *set) +{ + ENTRY; + + if (!oinfo->oi_oa) { + CDEBUG(D_INFO, "oa NULL\n"); + RETURN(-EINVAL); + } + + oinfo->oi_oa->o_size = start; + oinfo->oi_oa->o_blocks = end; + oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); + + RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set)); } /* Find and cancel locally locks matched by @mode in the resource found by