Whamcloud - gitweb
LU-10810 clio: SEEK_HOLE/SEEK_DATA on client side
[fs/lustre-release.git] / lustre / osc / osc_io.c
index b4a196f..3bddf18 100644 (file)
@@ -40,6 +40,7 @@
 #include <lustre_obdo.h>
 #include <lustre_osc.h>
 #include <linux/pagevec.h>
+#include <linux/falloc.h>
 
 #include "osc_internal.h"
 
@@ -87,12 +88,12 @@ static int osc_io_read_ahead(const struct lu_env *env,
                        ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
                }
 
-               ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc;
-               ra->cra_end = cl_index(osc2cl(osc),
-                                      dlmlock->l_policy_data.l_extent.end);
+               ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
+               ra->cra_end_idx = cl_index(osc2cl(osc),
+                                          dlmlock->l_policy_data.l_extent.end);
                ra->cra_release = osc_read_ahead_release;
                ra->cra_cbdata = dlmlock;
-               if (ra->cra_end != CL_PAGE_EOF)
+               if (ra->cra_end_idx != CL_PAGE_EOF)
                        ra->cra_contention = true;
                result = 0;
        }
@@ -116,7 +117,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
        struct osc_object *osc  = NULL; /* to keep gcc happy */
        struct osc_page   *opg;
        struct cl_io      *io;
-       struct list_head  list = LIST_HEAD_INIT(list);
+       LIST_HEAD(list);
 
        struct cl_page_list *qin      = &queue->c2_qin;
        struct cl_page_list *qout     = &queue->c2_qout;
@@ -143,6 +144,10 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
        if (crt == CRT_READ && ios->cis_io->ci_ndelay)
                brw_flags |= OBD_BRW_NDELAY;
 
+       page = cl_page_list_first(qin);
+       if (page->cp_type == CPT_TRANSIENT)
+               brw_flags |= OBD_BRW_NOCACHE;
+
         /*
          * NOTE: here @page is a top-level page. This is done to avoid
          *       creation of sub-page-list.
@@ -368,15 +373,30 @@ int osc_io_commit_async(const struct lu_env *env,
 }
 EXPORT_SYMBOL(osc_io_commit_async);
 
+static bool osc_import_not_healthy(struct obd_import *imp)
+{
+       return imp->imp_invalid || imp->imp_deactive ||
+              !(imp->imp_state == LUSTRE_IMP_FULL ||
+                imp->imp_state == LUSTRE_IMP_IDLE);
+}
+
 int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
 {
        struct osc_object *osc = cl2osc(ios->cis_obj);
        struct obd_import *imp = osc_cli(osc)->cl_import;
        struct osc_io *oio = osc_env_io(env);
        int rc = -EIO;
+       ENTRY;
 
        spin_lock(&imp->imp_lock);
-       if (likely(!imp->imp_invalid)) {
+       /**
+        * check whether this OSC device is available for non-delay read,
+        * fast switching mirror if we haven't tried all mirrors.
+        */
+       if (ios->cis_io->ci_type == CIT_READ && ios->cis_io->ci_ndelay &&
+           !ios->cis_io->ci_tried_all_mirrors && osc_import_not_healthy(imp)) {
+               rc = -EWOULDBLOCK;
+       } else if (likely(!imp->imp_invalid)) {
                atomic_inc(&osc->oo_nr_ios);
                oio->oi_is_active = 1;
                rc = 0;
@@ -386,7 +406,7 @@ int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
        if (cfs_capable(CFS_CAP_SYS_RESOURCE))
                oio->oi_cap_sys_resource = 1;
 
-       return rc;
+       RETURN(rc);
 }
 EXPORT_SYMBOL(osc_io_iter_init);
 
@@ -479,7 +499,7 @@ static int osc_async_upcall(void *a, int rc)
 /**
  * Checks that there are no pages being written in the extent being truncated.
  */
-static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
+static bool trunc_check_cb(const struct lu_env *env, struct cl_io *io,
                          struct osc_page *ops , void *cbdata)
 {
        struct cl_page *page = ops->ops_cl.cpl_page;
@@ -496,7 +516,7 @@ static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
                CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
                       ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
 
-       return CLP_GANG_OKAY;
+       return true;
 }
 
 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
@@ -521,23 +541,30 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
 static int osc_io_setattr_start(const struct lu_env *env,
                                 const struct cl_io_slice *slice)
 {
-        struct cl_io            *io     = slice->cis_io;
-        struct osc_io           *oio    = cl2osc_io(env, slice);
-        struct cl_object        *obj    = slice->cis_obj;
-        struct lov_oinfo        *loi    = cl2osc(obj)->oo_oinfo;
-        struct cl_attr          *attr   = &osc_env_info(env)->oti_attr;
-        struct obdo             *oa     = &oio->oi_oa;
+       struct cl_io *io = slice->cis_io;
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct cl_object *obj = slice->cis_obj;
+       struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+       struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+       struct obdo *oa = &oio->oi_oa;
        struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
-       __u64                    size   = io->u.ci_setattr.sa_attr.lvb_size;
        unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
        enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
        int result = 0;
+       __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+       __u64 end = OBD_OBJECT_EOF;
+       bool io_is_falloc = false;
 
        ENTRY;
        /* truncate cache dirty pages first */
-       if (cl_io_is_trunc(io))
+       if (cl_io_is_trunc(io)) {
                result = osc_cache_truncate_start(env, cl2osc(obj), size,
                                                  &oio->oi_trunc);
+       } else if (cl_io_is_fallocate(io)) {
+               io_is_falloc = true;
+               size = io->u.ci_setattr.sa_falloc_offset;
+               end = io->u.ci_setattr.sa_falloc_end;
+       }
 
        if (result == 0 && oio->oi_lockless == 0) {
                cl_object_attr_lock(obj);
@@ -589,9 +616,15 @@ static int osc_io_setattr_start(const struct lu_env *env,
                        oa->o_mtime = attr->cat_mtime;
                }
                if (ia_avalid & ATTR_SIZE) {
-                       oa->o_size = size;
-                       oa->o_blocks = OBD_OBJECT_EOF;
-                       oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+                       if (io_is_falloc) {
+                               oa->o_size = size;
+                               oa->o_blocks = end;
+                               oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+                       } else {
+                               oa->o_size = size;
+                               oa->o_blocks = OBD_OBJECT_EOF;
+                               oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+                       }
 
                        if (oio->oi_lockless) {
                                oa->o_flags = OBD_FL_SRVLOCK;
@@ -614,14 +647,20 @@ static int osc_io_setattr_start(const struct lu_env *env,
 
                init_completion(&cbargs->opc_sync);
 
-               if (ia_avalid & ATTR_SIZE)
+               if (io_is_falloc) {
+                       int falloc_mode = io->u.ci_setattr.sa_falloc_mode;
+
+                       result = osc_fallocate_base(osc_export(cl2osc(obj)),
+                                                   oa, osc_async_upcall,
+                                                   cbargs, falloc_mode);
+               } else if (ia_avalid & ATTR_SIZE) {
                        result = osc_punch_send(osc_export(cl2osc(obj)),
                                                oa, osc_async_upcall, cbargs);
-               else
+               } else {
                        result = osc_setattr_async(osc_export(cl2osc(obj)),
                                                   oa, osc_async_upcall,
                                                   cbargs, PTLRPCD_SET);
-
+               }
                cbargs->opc_rpc_sent = result == 0;
        }
 
@@ -651,6 +690,7 @@ void osc_io_setattr_end(const struct lu_env *env,
                        struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
 
                        LASSERT(cl_io_is_trunc(io));
+                       LASSERT(cl_io_is_trunc(io) || cl_io_is_fallocate(io));
                        /* XXX: Need a lock. */
                        osd->od_stats.os_lockless_truncates++;
                }
@@ -658,17 +698,39 @@ void osc_io_setattr_end(const struct lu_env *env,
 
        if (cl_io_is_trunc(io)) {
                __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+
+               if (result == 0) {
+                       cl_object_attr_lock(obj);
+                       if (oa->o_valid & OBD_MD_FLBLOCKS) {
+                               attr->cat_blocks = oa->o_blocks;
+                               cl_valid |= CAT_BLOCKS;
+                       }
+
+                       cl_object_attr_update(env, obj, attr, cl_valid);
+                       cl_object_attr_unlock(obj);
+               }
+               osc_trunc_check(env, io, oio, size);
+               osc_cache_truncate_end(env, oio->oi_trunc);
+               oio->oi_trunc = NULL;
+       }
+
+       if (cl_io_is_fallocate(io)) {
                cl_object_attr_lock(obj);
+
+               /* update blocks */
                if (oa->o_valid & OBD_MD_FLBLOCKS) {
                        attr->cat_blocks = oa->o_blocks;
                        cl_valid |= CAT_BLOCKS;
                }
 
+               /* update size */
+               if (oa->o_valid & OBD_MD_FLSIZE) {
+                       attr->cat_size = oa->o_size;
+                       cl_valid |= CAT_SIZE;
+               }
+
                cl_object_attr_update(env, obj, attr, cl_valid);
                cl_object_attr_unlock(obj);
-               osc_trunc_check(env, io, oio, size);
-               osc_cache_truncate_end(env, oio->oi_trunc);
-               oio->oi_trunc = NULL;
        }
 }
 EXPORT_SYMBOL(osc_io_setattr_end);
@@ -991,6 +1053,130 @@ void osc_io_end(const struct lu_env *env, const struct cl_io_slice *slice)
 }
 EXPORT_SYMBOL(osc_io_end);
 
+struct osc_lseek_args {
+       struct osc_io *lsa_oio;
+};
+
+static int osc_lseek_interpret(const struct lu_env *env,
+                              struct ptlrpc_request *req,
+                              void *arg, int rc)
+{
+       struct ost_body *reply;
+       struct osc_lseek_args *lsa = arg;
+       struct osc_io *oio = lsa->lsa_oio;
+       struct cl_io *io = oio->oi_cl.cis_io;
+       struct cl_lseek_io *lsio = &io->u.ci_lseek;
+
+       ENTRY;
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       reply = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (reply == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       lsio->ls_result = reply->oa.o_size;
+out:
+       osc_async_upcall(&oio->oi_cbarg, rc);
+       RETURN(rc);
+}
+
+int osc_io_lseek_start(const struct lu_env *env,
+                      const struct cl_io_slice *slice)
+{
+       struct cl_io *io = slice->cis_io;
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct cl_object *obj = slice->cis_obj;
+       struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+       struct cl_lseek_io *lsio = &io->u.ci_lseek;
+       struct obdo *oa = &oio->oi_oa;
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       struct obd_export *exp = osc_export(cl2osc(obj));
+       struct ptlrpc_request *req;
+       struct ost_body *body;
+       struct osc_lseek_args *lsa;
+       int rc = 0;
+
+       ENTRY;
+
+       /* No negative values at this point */
+       LASSERT(lsio->ls_start >= 0);
+       LASSERT(lsio->ls_whence == SEEK_HOLE || lsio->ls_whence == SEEK_DATA);
+
+       /* with IO lock taken we have object size in LVB and can check
+        * boundaries prior sending LSEEK RPC
+        */
+       if (lsio->ls_start >= loi->loi_lvb.lvb_size) {
+               /* consider area beyond end of object as hole */
+               if (lsio->ls_whence == SEEK_HOLE)
+                       lsio->ls_result = lsio->ls_start;
+               else
+                       lsio->ls_result = -ENXIO;
+               RETURN(0);
+       }
+
+       /* if LSEEK RPC is not supported by server, consider whole stripe
+        * object is data with hole after end of object
+        */
+       if (!exp_connect_lseek(exp)) {
+               if (lsio->ls_whence == SEEK_HOLE)
+                       lsio->ls_result = loi->loi_lvb.lvb_size;
+               else
+                       lsio->ls_result = lsio->ls_start;
+               RETURN(0);
+       }
+
+       memset(oa, 0, sizeof(*oa));
+       oa->o_oi = loi->loi_oi;
+       oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+       oa->o_size = lsio->ls_start;
+       oa->o_mode = lsio->ls_whence;
+       if (oio->oi_lockless) {
+               oa->o_flags = OBD_FL_SRVLOCK;
+               oa->o_valid |= OBD_MD_FLFLAGS;
+       }
+
+       init_completion(&cbargs->opc_sync);
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SEEK);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SEEK);
+       if (rc < 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+       ptlrpc_request_set_replen(req);
+       req->rq_interpret_reply = osc_lseek_interpret;
+       lsa = ptlrpc_req_async_args(lsa, req);
+       lsa->lsa_oio = oio;
+
+       ptlrpcd_add_req(req);
+       cbargs->opc_rpc_sent = 1;
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(osc_io_lseek_start);
+
+void osc_io_lseek_end(const struct lu_env *env,
+                     const struct cl_io_slice *slice)
+{
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       int rc = 0;
+
+       if (cbargs->opc_rpc_sent) {
+               wait_for_completion(&cbargs->opc_sync);
+               rc = cbargs->opc_rc;
+       }
+       slice->cis_io->ci_result = rc;
+}
+EXPORT_SYMBOL(osc_io_lseek_end);
+
 static const struct cl_io_operations osc_io_ops = {
        .op = {
                [CIT_READ] = {
@@ -1033,6 +1219,11 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_end    = osc_io_ladvise_end,
                        .cio_fini   = osc_io_fini
                },
+               [CIT_LSEEK] = {
+                       .cio_start  = osc_io_lseek_start,
+                       .cio_end    = osc_io_lseek_end,
+                       .cio_fini   = osc_io_fini
+               },
                [CIT_MISC] = {
                        .cio_fini   = osc_io_fini
                }