Whamcloud - gitweb
LU-1030 clio: reimplement ll_fsync in clio way
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Mon, 21 May 2012 19:12:59 +0000 (12:12 -0700)
committerOleg Drokin <green@whamcloud.com>
Fri, 1 Jun 2012 21:11:17 +0000 (17:11 -0400)
A new clio operation CIT_FSYNC is added to forcibly synchorize data
on local client and OST.

Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I047471f819c00901907d4312a353095b44dd96a1
Reviewed-on: http://review.whamcloud.com/2460
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/include/obd_ost.h
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/vvp_io.c
lustre/lov/lov_io.c
lustre/obdclass/cl_io.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_request.c

index b88655b..c0f014d 100644 (file)
@@ -1901,6 +1901,11 @@ enum cl_io_type {
          */
         CIT_FAULT,
         /**
+        * fsync system call handling
+        * To write out a range of file
+        */
+       CIT_FSYNC,
+       /**
          * Miscellaneous io. This is used for occasional io activity that
          * doesn't fit into other types. Currently this is used for:
          *
@@ -2299,6 +2304,13 @@ struct cl_io {
                         /** resulting page */
                         struct cl_page *ft_page;
                 } ci_fault;
+               struct cl_fsync_io {
+                       loff_t             fi_start;
+                       loff_t             fi_end;
+                       struct obd_capa   *fi_capa;
+                       /** file system level fid */
+                       struct lu_fid     *fi_fid;
+               } ci_fsync;
         } u;
         struct cl_2queue     ci_queue;
         size_t               ci_nob;
index e96a1cb..17d9c03 100644 (file)
@@ -69,6 +69,12 @@ struct osc_setattr_args {
         void                *sa_cookie;
 };
 
+struct osc_fsync_args {
+       struct obd_info     *fa_oi;
+       obd_enqueue_update_f fa_upcall;
+       void                *fa_cookie;
+};
+
 struct osc_enqueue_args {
         struct obd_export        *oa_exp;
         int                      *oa_flags;
index 35b1e1a..5498217 100644 (file)
@@ -1982,6 +1982,48 @@ int ll_flush(struct file *file)
         return rc ? -EIO : 0;
 }
 
+/**
+ * Called to make sure a portion of file has been written out.
+ * if @local_only is not true, it will send OST_SYNC RPCs to ost.
+ */
+int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end)
+{
+       struct cl_env_nest nest;
+       struct lu_env *env;
+       struct cl_io *io;
+       struct obd_capa *capa = NULL;
+       struct cl_fsync_io *fio;
+       int result;
+       ENTRY;
+
+       env = cl_env_nested_get(&nest);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
+
+       io = ccc_env_thread_io(env);
+       io->ci_obj = cl_i2info(inode)->lli_clob;
+
+       /* initialize parameters for sync */
+       fio = &io->u.ci_fsync;
+       fio->fi_capa = capa;
+       fio->fi_start = start;
+       fio->fi_end = end;
+       fio->fi_fid = ll_inode2fid(inode);
+
+       if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
+               result = cl_io_loop(env, io);
+       else
+               result = io->ci_result;
+       cl_io_fini(env, io);
+       cl_env_nested_put(&nest, env);
+
+       capa_put(capa);
+
+       RETURN(result);
+}
+
 #ifdef HAVE_FILE_FSYNC_4ARGS
 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
 #elif defined(HAVE_FILE_FSYNC_2ARGS)
@@ -2029,33 +2071,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
                 ptlrpc_req_finished(req);
 
         if (data && lsm) {
-                struct obd_info *oinfo;
-
-                OBD_ALLOC_PTR(oinfo);
-                if (!oinfo)
-                        RETURN(rc ? rc : -ENOMEM);
-                OBDO_ALLOC(oinfo->oi_oa);
-                if (!oinfo->oi_oa) {
-                        OBD_FREE_PTR(oinfo);
-                        RETURN(rc ? rc : -ENOMEM);
-                }
-                oinfo->oi_oa->o_id = lsm->lsm_object_id;
-                oinfo->oi_oa->o_seq = lsm->lsm_object_seq;
-                oinfo->oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
-                obdo_from_inode(oinfo->oi_oa, inode,
-                                OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                                OBD_MD_FLGROUP);
-                obdo_set_parent_fid(oinfo->oi_oa, &ll_i2info(inode)->lli_fid);
-                oinfo->oi_md = lsm;
-                oinfo->oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
-                err = obd_sync_rqset(ll_i2sbi(inode)->ll_dt_exp, oinfo, 0,
-                                     OBD_OBJECT_EOF);
-                capa_put(oinfo->oi_capa);
+               err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF);
                 if (!rc)
                         rc = err;
-                OBDO_FREE(oinfo->oi_oa);
-                OBD_FREE_PTR(oinfo);
                 lli->lli_write_rc = rc < 0 ? rc : 0;
         }
 
index 8d0cf47..b03ce4b 100644 (file)
@@ -1402,6 +1402,8 @@ static inline int cl_merge_lvb(struct inode *inode)
 
 struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt);
 
+int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end);
+
 /** direct write pages */
 struct ll_dio_pages {
         /** page array to be written. we don't support
index d45cc26..4a65b9e 100644 (file)
@@ -802,6 +802,13 @@ out:
         return result;
 }
 
+static void vvp_io_fsync_end(const struct lu_env *env,
+                            const struct cl_io_slice *ios)
+{
+       /* never try to verify there is no dirty pages in sync range
+        * because page_mkwrite() can generate new dirty pages any time. */
+}
+
 static int vvp_io_read_page(const struct lu_env *env,
                             const struct cl_io_slice *ios,
                             const struct cl_page_slice *slice)
@@ -1100,6 +1107,10 @@ static const struct cl_io_operations vvp_io_ops = {
                         .cio_start     = vvp_io_fault_start,
                         .cio_end       = ccc_io_end
                 },
+               [CIT_FSYNC] = {
+                       .cio_end    = vvp_io_fsync_end,
+                       .cio_fini   = vvp_io_fini
+               },
                 [CIT_MISC] = {
                         .cio_fini   = vvp_io_fini
                 }
index 907c81b..3cf97dd 100644 (file)
@@ -109,6 +109,13 @@ static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
                 io->u.ci_fault.ft_index = cl_index(obj, off);
                 break;
         }
+       case CIT_FSYNC: {
+               io->u.ci_fsync.fi_start = start;
+               io->u.ci_fsync.fi_end = end;
+               io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa;
+               io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
+               break;
+       }
         case CIT_READ:
         case CIT_WRITE: {
                 if (cl_io_is_append(parent)) {
@@ -331,6 +338,12 @@ static void lov_io_slice_init(struct lov_io *lio,
                 break;
         }
 
+       case CIT_FSYNC: {
+               lio->lis_pos = io->u.ci_fsync.fi_start;
+               lio->lis_endpos = io->u.ci_fsync.fi_end;
+               break;
+       }
+
         case CIT_MISC:
                 lio->lis_pos = 0;
                 lio->lis_endpos = OBD_OBJECT_EOF;
@@ -445,6 +458,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
                        int (*iofunc)(const struct lu_env *, struct cl_io *))
 {
+       struct cl_io *parent = lio->lis_cl.cis_io;
         struct lov_io_sub *sub;
         int rc = 0;
 
@@ -455,6 +469,9 @@ static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
                 lov_sub_exit(sub);
                 if (rc)
                         break;
+
+               if (parent->ci_result == 0)
+                       parent->ci_result = sub->sub_io->ci_result;
         }
         RETURN(rc);
 }
@@ -764,6 +781,15 @@ static const struct cl_io_operations lov_io_ops = {
                         .cio_start     = lov_io_fault_start,
                         .cio_end       = lov_io_end
                 },
+               [CIT_FSYNC] = {
+                       .cio_fini      = lov_io_fini,
+                       .cio_iter_init = lov_io_iter_init,
+                       .cio_iter_fini = lov_io_iter_fini,
+                       .cio_lock      = lov_io_lock,
+                       .cio_unlock    = lov_io_unlock,
+                       .cio_start     = lov_io_start,
+                       .cio_end       = lov_io_end
+               },
                 [CIT_MISC] = {
                         .cio_fini   = lov_io_fini
                 }
@@ -836,6 +862,9 @@ static const struct cl_io_operations lov_empty_io_ops = {
                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
                 },
+               [CIT_FSYNC] = {
+                       .cio_fini   = lov_empty_io_fini
+               },
                 [CIT_MISC] = {
                         .cio_fini   = lov_empty_io_fini
                 }
@@ -879,6 +908,7 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
         switch (io->ci_type) {
         default:
                 LBUG();
+       case CIT_FSYNC:
         case CIT_MISC:
         case CIT_READ:
                 result = 0;
index d90be9c..4ac5087 100644 (file)
@@ -1033,7 +1033,9 @@ int cl_io_loop(const struct lu_env *env, struct cl_io *io)
                 }
                 cl_io_iter_fini(env, io);
         } while (result == 0 && io->ci_continue);
-        RETURN(result < 0 ? result : 0);
+       if (result == 0)
+               result = io->ci_result;
+       RETURN(result < 0 ? result : 0);
 }
 EXPORT_SYMBOL(cl_io_loop);
 
index 57350ea..3094ca2 100644 (file)
@@ -70,11 +70,12 @@ struct osc_io {
         /** true if this io is lockless. */
         int                oi_lockless;
 
-        struct obdo        oi_oa;
-        struct osc_setattr_cbargs {
-                int               opc_rc;
-                cfs_completion_t  opc_sync;
-        } oi_setattr_cbarg;
+       struct obd_info    oi_info;
+       struct obdo        oi_oa;
+       struct osc_async_cbargs {
+               int               opc_rc;
+               cfs_completion_t  opc_sync;
+       } oi_cbarg;
 };
 
 /**
index 7293627..3f81c7b 100644 (file)
@@ -136,6 +136,9 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
                    obd_enqueue_update_f upcall, void *cookie,
                    struct ptlrpc_request_set *rqset);
+int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
+                 obd_enqueue_update_f upcall, void *cookie,
+                 struct ptlrpc_request_set *rqset);
 
 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
index 32044fa..5818b8d 100644 (file)
@@ -363,9 +363,9 @@ static int osc_io_fault_start(const struct lu_env *env,
         RETURN(0);
 }
 
-static int osc_setattr_upcall(void *a, int rc)
+static int osc_async_upcall(void *a, int rc)
 {
-        struct osc_setattr_cbargs *args = a;
+       struct osc_async_cbargs *args = a;
 
         args->opc_rc = rc;
         cfs_complete(&args->opc_sync);
@@ -441,7 +441,7 @@ static int osc_io_setattr_start(const struct lu_env *env,
         struct lov_oinfo        *loi    = cl2osc(obj)->oo_oinfo;
         struct cl_attr          *attr   = &osc_env_info(env)->oti_attr;
         struct obdo             *oa     = &oio->oi_oa;
-        struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg;
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
         loff_t                   size   = io->u.ci_setattr.sa_attr.lvb_size;
         unsigned int             ia_valid = io->u.ci_setattr.sa_valid;
         int                      result = 0;
@@ -504,12 +504,12 @@ static int osc_io_setattr_start(const struct lu_env *env,
 
                 if (ia_valid & ATTR_SIZE)
                         result = osc_punch_base(osc_export(cl2osc(obj)),
-                                                &oinfo, osc_setattr_upcall,
+                                               &oinfo, osc_async_upcall,
                                                 cbargs, PTLRPCD_SET);
                 else
                         result = osc_setattr_async_base(osc_export(cl2osc(obj)),
                                                         &oinfo, NULL,
-                                                        osc_setattr_upcall,
+                                                       osc_async_upcall,
                                                         cbargs, PTLRPCD_SET);
         }
         return result;
@@ -520,7 +520,7 @@ static void osc_io_setattr_end(const struct lu_env *env,
 {
         struct cl_io            *io     = slice->cis_io;
         struct osc_io           *oio    = cl2osc_io(env, slice);
-        struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg;
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
         int result;
 
         cfs_wait_for_completion(&cbargs->opc_sync);
@@ -584,6 +584,52 @@ static int osc_io_write_start(const struct lu_env *env,
         RETURN(result);
 }
 
+static int osc_io_fsync_start(const struct lu_env *env,
+                             const struct cl_io_slice *slice)
+{
+       struct cl_io     *io    = slice->cis_io;
+       struct osc_io    *oio   = cl2osc_io(env, slice);
+       struct obdo      *oa    = &oio->oi_oa;
+       struct obd_info  *oinfo = &oio->oi_info;
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       struct cl_object *obj   = slice->cis_obj;
+       struct lov_oinfo *loi   = cl2osc(obj)->oo_oinfo;
+       int result = 0;
+       ENTRY;
+
+       memset(oa, 0, sizeof(*oa));
+       oa->o_id = loi->loi_id;
+       oa->o_seq = loi->loi_seq;
+       oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       /* reload size and blocks for start and end of sync range */
+       oa->o_size = io->u.ci_fsync.fi_start;
+       oa->o_blocks = io->u.ci_fsync.fi_end;
+       oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+
+       obdo_set_parent_fid(oa, io->u.ci_fsync.fi_fid);
+
+       memset(oinfo, 0, sizeof(*oinfo));
+       oinfo->oi_oa = oa;
+       oinfo->oi_capa = io->u.ci_fsync.fi_capa;
+       cfs_init_completion(&cbargs->opc_sync);
+
+       result = osc_sync_base(osc_export(cl2osc(obj)), oinfo,
+                              osc_async_upcall, cbargs, PTLRPCD_SET);
+       RETURN(result);
+}
+
+static void osc_io_fsync_end(const struct lu_env *env,
+                            const struct cl_io_slice *slice)
+{
+       struct cl_io  *io  = slice->cis_io;
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+
+       cfs_wait_for_completion(&cbargs->opc_sync);
+       io->ci_result = cbargs->opc_rc;
+}
+
 static const struct cl_io_operations osc_io_ops = {
         .op = {
                 [CIT_READ] = {
@@ -602,6 +648,11 @@ static const struct cl_io_operations osc_io_ops = {
                         .cio_fini   = osc_io_fini,
                         .cio_start  = osc_io_fault_start
                 },
+               [CIT_FSYNC] = {
+                       .cio_start  = osc_io_fsync_start,
+                       .cio_end    = osc_io_fsync_end,
+                       .cio_fini   = osc_io_fini
+               },
                 [CIT_MISC] = {
                         .cio_fini   = osc_io_fini
                 }
index 0de44b7..3aaf175 100644 (file)
@@ -568,7 +568,7 @@ static int osc_sync_interpret(const struct lu_env *env,
                               struct ptlrpc_request *req,
                               void *arg, int rc)
 {
-        struct osc_async_args *aa = arg;
+       struct osc_fsync_args *fa = arg;
         struct ost_body *body;
         ENTRY;
 
@@ -581,27 +581,22 @@ static int osc_sync_interpret(const struct lu_env *env,
                 GOTO(out, rc = -EPROTO);
         }
 
-        *aa->aa_oi->oi_oa = body->oa;
+       *fa->fa_oi->oi_oa = body->oa;
 out:
-        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
-        RETURN(rc);
+       rc = fa->fa_upcall(fa->fa_cookie, rc);
+       RETURN(rc);
 }
 
-static int osc_sync(const struct lu_env *env, struct obd_export *exp,
-                    struct obd_info *oinfo, obd_size start, obd_size end,
-                    struct ptlrpc_request_set *set)
+int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
+                 obd_enqueue_update_f upcall, void *cookie,
+                  struct ptlrpc_request_set *rqset)
 {
-        struct ptlrpc_request *req;
-        struct ost_body       *body;
-        struct osc_async_args *aa;
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       struct osc_fsync_args *fa;
         int                    rc;
         ENTRY;
 
-        if (!oinfo->oi_oa) {
-                CDEBUG(D_INFO, "oa NULL\n");
-                RETURN(-EINVAL);
-        }
-
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -617,20 +612,41 @@ static int osc_sync(const struct lu_env *env, struct obd_export *exp,
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
-        body->oa.o_size = start;
-        body->oa.o_blocks = end;
-        body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
         osc_pack_capa(req, body, oinfo->oi_capa);
 
         ptlrpc_request_set_replen(req);
         req->rq_interpret_reply = osc_sync_interpret;
 
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        aa->aa_oi = oinfo;
+       CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
+       fa = ptlrpc_req_async_args(req);
+       fa->fa_oi = oinfo;
+       fa->fa_upcall = upcall;
+       fa->fa_cookie = cookie;
 
-        ptlrpc_set_add_req(set, req);
-        RETURN (0);
+       if (rqset == PTLRPCD_SET)
+               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       else
+               ptlrpc_set_add_req(rqset, req);
+
+       RETURN (0);
+}
+
+static int osc_sync(const struct lu_env *env, struct obd_export *exp,
+                   struct obd_info *oinfo, obd_size start, obd_size end,
+                   struct ptlrpc_request_set *set)
+{
+       ENTRY;
+
+       if (!oinfo->oi_oa) {
+               CDEBUG(D_INFO, "oa NULL\n");
+               RETURN(-EINVAL);
+       }
+
+       oinfo->oi_oa->o_size = start;
+       oinfo->oi_oa->o_blocks = end;
+       oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+
+       RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
 }
 
 /* Find and cancel locally locks matched by @mode in the resource found by