Whamcloud - gitweb
LU-3321 clio: revert LU-2622 for removing global env list
[fs/lustre-release.git] / lustre / liblustre / llite_cl.c
index e3c340e..5c7ce91 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  *   Copyright (c) 2007 Cluster File Systems, Inc.
  *   Author: Nikita Danilov <nikita@clusterfs.com>
  *
@@ -18,6 +16,8 @@
  *   You should have received a copy of the GNU General Public License
  *   along with Lustre; if not, write to the Free Software
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *   Copyright (c) 2011, 2013, Intel Corporation.
  */
 
 #define DEBUG_SUBSYSTEM S_LLITE
 # include <sys/statfs.h>
 #endif
 
-#include <sysio.h>
-#ifdef HAVE_XTIO_H
-#include <xtio.h>
-#endif
-#include <fs.h>
-#include <mount.h>
-#include <inode.h>
-#ifdef HAVE_FILE_H
-#include <file.h>
-#endif
 #include <liblustre.h>
 
 #include <obd.h>
@@ -67,9 +57,8 @@
 static int   slp_type_init     (struct lu_device_type *t);
 static void  slp_type_fini     (struct lu_device_type *t);
 
-static struct cl_page * slp_page_init(const struct lu_env *env,
-                                     struct cl_object *obj,
-                                     struct cl_page *page, cfs_page_t *vmpage);
+static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
+                        struct cl_page *page, struct page *vmpage);
 static int   slp_attr_get     (const struct lu_env *env, struct cl_object *obj,
                                struct cl_attr *attr);
 
@@ -234,32 +223,26 @@ void slp_global_fini(void)
  *
  */
 
-static struct cl_page *slp_page_init(const struct lu_env *env,
-                                     struct cl_object *obj,
-                                     struct cl_page *page, cfs_page_t *vmpage)
+static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
+                       struct cl_page *page, struct page *vmpage)
 {
-        struct ccc_page *cpg;
-        int result;
+        struct ccc_page *cpg = cl_object_page_slice(obj, page);
 
         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
 
-        OBD_ALLOC_PTR(cpg);
-        if (cpg != NULL) {
-                cpg->cpg_page = vmpage;
+       cpg->cpg_page = vmpage;
 
-                if (page->cp_type == CPT_CACHEABLE) {
-                        LBUG();
-                } else {
-                        struct ccc_object *clobj = cl2ccc(obj);
+       if (page->cp_type == CPT_CACHEABLE) {
+               LBUG();
+       } else {
+               struct ccc_object *clobj = cl2ccc(obj);
 
-                        cl_page_slice_add(page, &cpg->cpg_cl, obj,
-                                          &slp_transient_page_ops);
-                        clobj->cob_transient_pages++;
-                }
-                result = 0;
-        } else
-                result = -ENOMEM;
-        return ERR_PTR(result);
+               cl_page_slice_add(page, &cpg->cpg_cl, obj,
+                               &slp_transient_page_ops);
+               clobj->cob_transient_pages++;
+       }
+
+        return 0;
 }
 
 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
@@ -311,7 +294,7 @@ static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
 
 static void slp_page_fini_common(struct ccc_page *cp)
 {
-        cfs_page_t *vmpage = cp->cpg_page;
+       struct page *vmpage = cp->cpg_page;
 
         LASSERT(vmpage != NULL);
         llu_free_user_page(vmpage);
@@ -321,14 +304,7 @@ static void slp_page_fini_common(struct ccc_page *cp)
 static void slp_page_completion_common(const struct lu_env *env,
                                        struct ccc_page *cp, int ioret)
 {
-        struct cl_sync_io *anchor = cp->cpg_sync_io;
-
-        if (anchor) {
-                cp->cpg_sync_io  = NULL;
-                cl_sync_io_note(anchor, ioret);
-        } else {
-                LBUG();
-        }
+        LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
 }
 
 static void slp_page_completion_read(const struct lu_env *env,
@@ -405,7 +381,7 @@ static const struct cl_page_operations slp_transient_page_ops = {
 
 static int slp_lock_enqueue(const struct lu_env *env,
                            const struct cl_lock_slice *slice,
-                           struct cl_io *_, __u32 enqflags)
+                           struct cl_io *unused, __u32 enqflags)
 {
         CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
 
@@ -414,6 +390,7 @@ static int slp_lock_enqueue(const struct lu_env *env,
 }
 
 static const struct cl_lock_operations slp_lock_ops = {
+        .clo_delete    = ccc_lock_delete,
         .clo_fini      = ccc_lock_fini,
         .clo_enqueue   = slp_lock_enqueue,
         .clo_wait      = ccc_lock_wait,
@@ -428,9 +405,10 @@ static const struct cl_lock_operations slp_lock_ops = {
  */
 
 static int slp_io_rw_lock(const struct lu_env *env,
-                             const struct cl_io_slice *ios)
+                          const struct cl_io_slice *ios)
 {
-        struct cl_io *io = ios->cis_io;
+        struct ccc_io *cio = ccc_env_io(env);
+        struct cl_io *io   = ios->cis_io;
         loff_t start;
         loff_t end;
 
@@ -441,6 +419,9 @@ static int slp_io_rw_lock(const struct lu_env *env,
                 start = io->u.ci_wr.wr.crw_pos;
                 end   = start + io->u.ci_wr.wr.crw_count - 1;
         }
+
+        ccc_io_update_iov(env, cio, io);
+
         /*
          * This acquires real DLM lock only in O_APPEND case, because of
          * the io->ci_lockreq setting in llu_io_init().
@@ -453,14 +434,14 @@ static int slp_io_rw_lock(const struct lu_env *env,
 
 }
 
-static int slp_io_trunc_iter_init(const struct lu_env *env,
-                                  const struct cl_io_slice *ios)
+static int slp_io_setattr_iter_init(const struct lu_env *env,
+                                    const struct cl_io_slice *ios)
 {
         return 0;
 }
 
-static int slp_io_trunc_start(const struct lu_env *env,
-                              const struct cl_io_slice *ios)
+static int slp_io_setattr_start(const struct lu_env *env,
+                                const struct cl_io_slice *ios)
 {
         return 0;
 }
@@ -489,6 +470,7 @@ static void llu_free_user_page(struct page *page)
         OBD_FREE_PTR(page);
 }
 
+
 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
                          struct llu_io_group *group,
                          char *buf, size_t count, loff_t pos)
@@ -498,19 +480,14 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
         struct intnl_stat *st = llu_i2stat(inode);
         struct obd_export *exp = llu_i2obdexp(inode);
         struct page *page;
-        int  rc = 0, npages = 0, ret_bytes = 0;
-        int local_lock;
+        int  rc = 0, ret_bytes = 0;
         struct cl_page *clp;
-        struct ccc_page *clup;
         struct cl_2queue *queue;
-        struct cl_sync_io *anchor = &ccc_env_info(env)->cti_sync_io;
         ENTRY;
 
         if (!exp)
                 RETURN(-EINVAL);
 
-        local_lock = group->lig_params->lrp_lock_mode != LCK_NL;
-
         queue = &io->ci_queue;
         cl_2queue_init(queue);
 
@@ -520,8 +497,8 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
                 unsigned long index, offset, bytes;
 
                 offset = (pos & ~CFS_PAGE_MASK);
-                index = pos >> CFS_PAGE_SHIFT;
-                bytes = CFS_PAGE_SIZE - offset;
+               index = pos >> PAGE_CACHE_SHIFT;
+               bytes = PAGE_CACHE_SIZE - offset;
                 if (bytes > count)
                         bytes = count;
 
@@ -556,8 +533,6 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
                         break;
                 }
 
-                clup = cl2ccc_page(cl_page_at(clp, &slp_device_type));
-                clup->cpg_sync_io = anchor;
                 cl_2queue_add(queue, clp);
 
                 /* drop the reference count for cl_page_find, so that the page
@@ -566,7 +541,6 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
 
                 cl_page_clip(env, clp, offset, offset+bytes);
 
-                npages++;
                 count -= bytes;
                 pos += bytes;
                 buf += bytes;
@@ -576,25 +550,10 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
                 page++;
         } while (count);
 
-        cl_sync_io_init(anchor, npages);
-        /* printk("Inited anchor with %d pages\n", npages); */
-
         if (rc == 0) {
-                rc = cl_io_submit_rw(env, io,
-                                     io->ci_type == CIT_READ ? CRT_READ :
-                                                               CRT_WRITE,
-                                     queue);
-                if (rc == 0) {
-                        /* If some pages weren't sent for any reason, count
-                         * then as completed, to avoid infinite wait. */
-                        cl_page_list_for_each(clp, &queue->c2_qin) {
-                                CL_PAGE_DEBUG(D_ERROR, env, clp,
-                                              "not completed\n");
-                                cl_sync_io_note(anchor, +1);
-                        }
-                        /* wait for the IO to be finished. */
-                        rc = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
-                }
+                enum cl_req_type iot;
+                iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
+               rc = cl_io_submit_sync(env, io, iot, queue, 0);
         }
 
         group->lig_rc = rc;
@@ -623,7 +582,8 @@ struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
 
 static int max_io_pages(ssize_t len, int iovlen)
 {
-        return (((len + CFS_PAGE_SIZE -1) / CFS_PAGE_SIZE) + 2 + iovlen - 1);
+       return ((len + PAGE_CACHE_SIZE - 1) / PAGE_CACHE_SIZE) +
+               2 + iovlen - 1;
 }
 
 void put_io_group(struct llu_io_group *group)
@@ -631,15 +591,23 @@ void put_io_group(struct llu_io_group *group)
         OBD_FREE_PTR(group);
 }
 
+/**
+ * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
+ */
+int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
+{
+        return 1;
+}
+
 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
 {
         struct ccc_io     *cio   = cl2ccc_io(env, ios);
         struct cl_io      *io    = ios->cis_io;
         struct cl_object  *obj   = io->ci_obj;
         struct inode      *inode = ccc_object_inode(obj);
-        int     err, ret;
-        loff_t  pos;
-        size_t  cnt;
+        int    err, ret;
+        loff_t pos;
+        long   cnt;
         struct llu_io_group *iogroup;
         struct lustre_rw_params p = {0};
         int iovidx;
@@ -647,6 +615,7 @@ static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
         struct llu_inode_info *lli = llu_i2info(inode);
         struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
         int write = io->ci_type == CIT_WRITE;
+        int exceed = 0;
 
         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
 
@@ -668,13 +637,13 @@ static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
         if (IS_ERR(iogroup))
                 RETURN(PTR_ERR(iogroup));
 
-        err = ccc_prep_size(env, obj, io, pos + cnt - 1, 0);
-        if (err != 0)
+        err = ccc_prep_size(env, obj, io, pos, cnt, &exceed);
+        if (err != 0 || (write == 0 && exceed != 0))
                 GOTO(out, err);
 
         CDEBUG(D_INODE,
-               "%s ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
-               write?"Write":"Read", (unsigned long)st->st_ino,
+               "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
+               write ? "Write" : "Read", (unsigned long)st->st_ino,
                cnt, (__u64)pos, (__u64)st->st_size);
 
         if (write && io->u.ci_wr.wr_append)
@@ -683,7 +652,7 @@ static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
 
         for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
                 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
-                size_t count = cio->cui_iov[iovidx].iov_len;
+                long count = cio->cui_iov[iovidx].iov_len;
 
                 if (!count)
                         continue;
@@ -724,9 +693,12 @@ static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
         }
         LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
 
-        session->lis_groups[session->lis_ngroups++] = iogroup;
+        if (!iogroup->lig_rc)
+                session->lis_rwcount += iogroup->lig_rwcount;
+        else if (!session->lis_rc)
+                session->lis_rc = iogroup->lig_rc;
+        err = 0;
 
-        return 0;
 out:
         put_io_group(iogroup);
         return err;
@@ -738,18 +710,20 @@ static const struct cl_io_operations ccc_io_ops = {
                         .cio_fini      = ccc_io_fini,
                         .cio_lock      = slp_io_rw_lock,
                         .cio_start     = slp_io_start,
-                        .cio_end       = ccc_io_end
+                        .cio_end       = ccc_io_end,
+                        .cio_advance   = ccc_io_advance
                 },
                 [CIT_WRITE] = {
                         .cio_fini      = ccc_io_fini,
                         .cio_lock      = slp_io_rw_lock,
                         .cio_start     = slp_io_start,
-                        .cio_end       = ccc_io_end
+                        .cio_end       = ccc_io_end,
+                        .cio_advance   = ccc_io_advance
                 },
-                [CIT_TRUNC] = {
+                [CIT_SETATTR] = {
                         .cio_fini       = ccc_io_fini,
-                        .cio_iter_init  = slp_io_trunc_iter_init,
-                        .cio_start      = slp_io_trunc_start
+                        .cio_iter_init  = slp_io_setattr_iter_init,
+                        .cio_start      = slp_io_setattr_start
                 },
                 [CIT_MISC] = {
                         .cio_fini   = ccc_io_fini