Whamcloud - gitweb
LU-10810 clio: SEEK_HOLE/SEEK_DATA on client side
[fs/lustre-release.git] / lustre / osc / osc_io.c
index 03b4cf8..3bddf18 100644 (file)
@@ -40,6 +40,7 @@
 #include <lustre_obdo.h>
 #include <lustre_osc.h>
 #include <linux/pagevec.h>
+#include <linux/falloc.h>
 
 #include "osc_internal.h"
 
@@ -498,7 +499,7 @@ static int osc_async_upcall(void *a, int rc)
 /**
  * Checks that there are no pages being written in the extent being truncated.
  */
-static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
+static bool trunc_check_cb(const struct lu_env *env, struct cl_io *io,
                          struct osc_page *ops , void *cbdata)
 {
        struct cl_page *page = ops->ops_cl.cpl_page;
@@ -515,7 +516,7 @@ static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
                CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
                       ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
 
-       return CLP_GANG_OKAY;
+       return true;
 }
 
 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
@@ -540,23 +541,30 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
 static int osc_io_setattr_start(const struct lu_env *env,
                                 const struct cl_io_slice *slice)
 {
-        struct cl_io            *io     = slice->cis_io;
-        struct osc_io           *oio    = cl2osc_io(env, slice);
-        struct cl_object        *obj    = slice->cis_obj;
-        struct lov_oinfo        *loi    = cl2osc(obj)->oo_oinfo;
-        struct cl_attr          *attr   = &osc_env_info(env)->oti_attr;
-        struct obdo             *oa     = &oio->oi_oa;
+       struct cl_io *io = slice->cis_io;
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct cl_object *obj = slice->cis_obj;
+       struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+       struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+       struct obdo *oa = &oio->oi_oa;
        struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
-       __u64                    size   = io->u.ci_setattr.sa_attr.lvb_size;
        unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
        enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
        int result = 0;
+       __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+       __u64 end = OBD_OBJECT_EOF;
+       bool io_is_falloc = false;
 
        ENTRY;
        /* truncate cache dirty pages first */
-       if (cl_io_is_trunc(io))
+       if (cl_io_is_trunc(io)) {
                result = osc_cache_truncate_start(env, cl2osc(obj), size,
                                                  &oio->oi_trunc);
+       } else if (cl_io_is_fallocate(io)) {
+               io_is_falloc = true;
+               size = io->u.ci_setattr.sa_falloc_offset;
+               end = io->u.ci_setattr.sa_falloc_end;
+       }
 
        if (result == 0 && oio->oi_lockless == 0) {
                cl_object_attr_lock(obj);
@@ -608,9 +616,15 @@ static int osc_io_setattr_start(const struct lu_env *env,
                        oa->o_mtime = attr->cat_mtime;
                }
                if (ia_avalid & ATTR_SIZE) {
-                       oa->o_size = size;
-                       oa->o_blocks = OBD_OBJECT_EOF;
-                       oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+                       if (io_is_falloc) {
+                               oa->o_size = size;
+                               oa->o_blocks = end;
+                               oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+                       } else {
+                               oa->o_size = size;
+                               oa->o_blocks = OBD_OBJECT_EOF;
+                               oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+                       }
 
                        if (oio->oi_lockless) {
                                oa->o_flags = OBD_FL_SRVLOCK;
@@ -633,14 +647,20 @@ static int osc_io_setattr_start(const struct lu_env *env,
 
                init_completion(&cbargs->opc_sync);
 
-               if (ia_avalid & ATTR_SIZE)
+               if (io_is_falloc) {
+                       int falloc_mode = io->u.ci_setattr.sa_falloc_mode;
+
+                       result = osc_fallocate_base(osc_export(cl2osc(obj)),
+                                                   oa, osc_async_upcall,
+                                                   cbargs, falloc_mode);
+               } else if (ia_avalid & ATTR_SIZE) {
                        result = osc_punch_send(osc_export(cl2osc(obj)),
                                                oa, osc_async_upcall, cbargs);
-               else
+               } else {
                        result = osc_setattr_async(osc_export(cl2osc(obj)),
                                                   oa, osc_async_upcall,
                                                   cbargs, PTLRPCD_SET);
-
+               }
                cbargs->opc_rpc_sent = result == 0;
        }
 
@@ -670,6 +690,7 @@ void osc_io_setattr_end(const struct lu_env *env,
                        struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
 
                        LASSERT(cl_io_is_trunc(io));
+                       LASSERT(cl_io_is_trunc(io) || cl_io_is_fallocate(io));
                        /* XXX: Need a lock. */
                        osd->od_stats.os_lockless_truncates++;
                }
@@ -677,17 +698,39 @@ void osc_io_setattr_end(const struct lu_env *env,
 
        if (cl_io_is_trunc(io)) {
                __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+
+               if (result == 0) {
+                       cl_object_attr_lock(obj);
+                       if (oa->o_valid & OBD_MD_FLBLOCKS) {
+                               attr->cat_blocks = oa->o_blocks;
+                               cl_valid |= CAT_BLOCKS;
+                       }
+
+                       cl_object_attr_update(env, obj, attr, cl_valid);
+                       cl_object_attr_unlock(obj);
+               }
+               osc_trunc_check(env, io, oio, size);
+               osc_cache_truncate_end(env, oio->oi_trunc);
+               oio->oi_trunc = NULL;
+       }
+
+       if (cl_io_is_fallocate(io)) {
                cl_object_attr_lock(obj);
+
+               /* update blocks */
                if (oa->o_valid & OBD_MD_FLBLOCKS) {
                        attr->cat_blocks = oa->o_blocks;
                        cl_valid |= CAT_BLOCKS;
                }
 
+               /* update size */
+               if (oa->o_valid & OBD_MD_FLSIZE) {
+                       attr->cat_size = oa->o_size;
+                       cl_valid |= CAT_SIZE;
+               }
+
                cl_object_attr_update(env, obj, attr, cl_valid);
                cl_object_attr_unlock(obj);
-               osc_trunc_check(env, io, oio, size);
-               osc_cache_truncate_end(env, oio->oi_trunc);
-               oio->oi_trunc = NULL;
        }
 }
 EXPORT_SYMBOL(osc_io_setattr_end);
@@ -1010,6 +1053,130 @@ void osc_io_end(const struct lu_env *env, const struct cl_io_slice *slice)
 }
 EXPORT_SYMBOL(osc_io_end);
 
+struct osc_lseek_args {
+       struct osc_io *lsa_oio;
+};
+
+static int osc_lseek_interpret(const struct lu_env *env,
+                              struct ptlrpc_request *req,
+                              void *arg, int rc)
+{
+       struct ost_body *reply;
+       struct osc_lseek_args *lsa = arg;
+       struct osc_io *oio = lsa->lsa_oio;
+       struct cl_io *io = oio->oi_cl.cis_io;
+       struct cl_lseek_io *lsio = &io->u.ci_lseek;
+
+       ENTRY;
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       reply = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (reply == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       lsio->ls_result = reply->oa.o_size;
+out:
+       osc_async_upcall(&oio->oi_cbarg, rc);
+       RETURN(rc);
+}
+
+int osc_io_lseek_start(const struct lu_env *env,
+                      const struct cl_io_slice *slice)
+{
+       struct cl_io *io = slice->cis_io;
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct cl_object *obj = slice->cis_obj;
+       struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+       struct cl_lseek_io *lsio = &io->u.ci_lseek;
+       struct obdo *oa = &oio->oi_oa;
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       struct obd_export *exp = osc_export(cl2osc(obj));
+       struct ptlrpc_request *req;
+       struct ost_body *body;
+       struct osc_lseek_args *lsa;
+       int rc = 0;
+
+       ENTRY;
+
+       /* No negative values at this point */
+       LASSERT(lsio->ls_start >= 0);
+       LASSERT(lsio->ls_whence == SEEK_HOLE || lsio->ls_whence == SEEK_DATA);
+
+       /* with IO lock taken we have object size in LVB and can check
+        * boundaries prior sending LSEEK RPC
+        */
+       if (lsio->ls_start >= loi->loi_lvb.lvb_size) {
+               /* consider area beyond end of object as hole */
+               if (lsio->ls_whence == SEEK_HOLE)
+                       lsio->ls_result = lsio->ls_start;
+               else
+                       lsio->ls_result = -ENXIO;
+               RETURN(0);
+       }
+
+       /* if LSEEK RPC is not supported by server, consider whole stripe
+        * object is data with hole after end of object
+        */
+       if (!exp_connect_lseek(exp)) {
+               if (lsio->ls_whence == SEEK_HOLE)
+                       lsio->ls_result = loi->loi_lvb.lvb_size;
+               else
+                       lsio->ls_result = lsio->ls_start;
+               RETURN(0);
+       }
+
+       memset(oa, 0, sizeof(*oa));
+       oa->o_oi = loi->loi_oi;
+       oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+       oa->o_size = lsio->ls_start;
+       oa->o_mode = lsio->ls_whence;
+       if (oio->oi_lockless) {
+               oa->o_flags = OBD_FL_SRVLOCK;
+               oa->o_valid |= OBD_MD_FLFLAGS;
+       }
+
+       init_completion(&cbargs->opc_sync);
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SEEK);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SEEK);
+       if (rc < 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+       ptlrpc_request_set_replen(req);
+       req->rq_interpret_reply = osc_lseek_interpret;
+       lsa = ptlrpc_req_async_args(lsa, req);
+       lsa->lsa_oio = oio;
+
+       ptlrpcd_add_req(req);
+       cbargs->opc_rpc_sent = 1;
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(osc_io_lseek_start);
+
+void osc_io_lseek_end(const struct lu_env *env,
+                     const struct cl_io_slice *slice)
+{
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       int rc = 0;
+
+       if (cbargs->opc_rpc_sent) {
+               wait_for_completion(&cbargs->opc_sync);
+               rc = cbargs->opc_rc;
+       }
+       slice->cis_io->ci_result = rc;
+}
+EXPORT_SYMBOL(osc_io_lseek_end);
+
 static const struct cl_io_operations osc_io_ops = {
        .op = {
                [CIT_READ] = {
@@ -1052,6 +1219,11 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_end    = osc_io_ladvise_end,
                        .cio_fini   = osc_io_fini
                },
+               [CIT_LSEEK] = {
+                       .cio_start  = osc_io_lseek_start,
+                       .cio_end    = osc_io_lseek_end,
+                       .cio_fini   = osc_io_fini
+               },
                [CIT_MISC] = {
                        .cio_fini   = osc_io_fini
                }