Whamcloud - gitweb
LU-3321 clio: add pages into writeback cache in batch 93/7893/19
authorJinshan Xiong <jinshan.xiong@intel.com>
Mon, 30 Sep 2013 23:11:11 +0000 (16:11 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 2 Dec 2013 15:48:57 +0000 (15:48 +0000)
in ll_write_end(), instead of adding the page into writeback
cache directly, it will be held in a page list. After enough
pages have been collected, issue them all with cio_commit_async().

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I7393e7ac7e44ab8d53f89cebd61dc9b34922f18c
Reviewed-on: http://review.whamcloud.com/7893
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Bobi Jam <bobijam@gmail.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
20 files changed:
lustre/include/cl_object.h
lustre/include/lclient.h
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/rw.c
lustre/llite/rw26.c
lustre/llite/vvp_internal.h
lustre/llite/vvp_io.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_page.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_page.c
lustre/obdecho/echo_client.c
lustre/osc/osc_cache.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c

index 6c9bdf0..0b9188e 100644 (file)
@@ -1013,26 +1013,6 @@ struct cl_page_operations {
                  */
                 int  (*cpo_make_ready)(const struct lu_env *env,
                                        const struct cl_page_slice *slice);
-                /**
-                 * Announce that this page is to be written out
-                 * opportunistically, that is, page is dirty, it is not
-                 * necessary to start write-out transfer right now, but
-                 * eventually page has to be written out.
-                 *
-                 * Main caller of this is the write path (see
-                 * vvp_io_commit_write()), using this method to build a
-                 * "transfer cache" from which large transfers are then
-                 * constructed by the req-formation engine.
-                 *
-                 * \todo XXX it would make sense to add page-age tracking
-                 * semantics here, and to oblige the req-formation engine to
-                 * send the page out not later than it is too old.
-                 *
-                 * \see cl_page_cache_add()
-                 */
-                int  (*cpo_cache_add)(const struct lu_env *env,
-                                      const struct cl_page_slice *slice,
-                                      struct cl_io *io);
         } io[CRT_NR];
         /**
          * Tell transfer engine that only [to, from] part of a page should be
@@ -2013,6 +1993,8 @@ struct cl_io_slice {
         cfs_list_t                     cis_linkage;
 };
 
+typedef void (*cl_commit_cbt)(const struct lu_env *, struct cl_io *,
+                               struct cl_page *);
 
 /**
  * Per-layer io operations.
@@ -2097,20 +2079,28 @@ struct cl_io_operations {
                 void (*cio_fini) (const struct lu_env *env,
                                   const struct cl_io_slice *slice);
         } op[CIT_OP_NR];
-        struct {
-                /**
-                 * Submit pages from \a queue->c2_qin for IO, and move
-                 * successfully submitted pages into \a queue->c2_qout. Return
-                 * non-zero if failed to submit even the single page. If
-                 * submission failed after some pages were moved into \a
-                 * queue->c2_qout, completion callback with non-zero ioret is
-                 * executed on them.
-                 */
-                int  (*cio_submit)(const struct lu_env *env,
-                                   const struct cl_io_slice *slice,
-                                   enum cl_req_type crt,
-                                  struct cl_2queue *queue);
-        } req_op[CRT_NR];
+
+       /**
+        * Submit pages from \a queue->c2_qin for IO, and move
+        * successfully submitted pages into \a queue->c2_qout. Return
+        * non-zero if failed to submit even the single page. If
+        * submission failed after some pages were moved into \a
+        * queue->c2_qout, completion callback with non-zero ioret is
+        * executed on them.
+        */
+       int  (*cio_submit)(const struct lu_env *env,
+                       const struct cl_io_slice *slice,
+                       enum cl_req_type crt,
+                       struct cl_2queue *queue);
+       /**
+        * Queue async page for write.
+        * The difference between cio_submit and cio_queue is that
+        * cio_submit is for urgent request.
+        */
+       int  (*cio_commit_async)(const struct lu_env *env,
+                       const struct cl_io_slice *slice,
+                       struct cl_page_list *queue, int from, int to,
+                       cl_commit_cbt cb);
         /**
          * Read missing page.
          *
@@ -2123,31 +2113,6 @@ struct cl_io_operations {
                              const struct cl_io_slice *slice,
                              const struct cl_page_slice *page);
         /**
-         * Prepare write of a \a page. Called bottom-to-top by a top-level
-         * cl_io_operations::op[CIT_WRITE]::cio_start() to prepare page for
-         * get data from user-level buffer.
-         *
-         * \pre io->ci_type == CIT_WRITE
-         *
-         * \see vvp_io_prepare_write(), lov_io_prepare_write(),
-         * osc_io_prepare_write().
-         */
-        int (*cio_prepare_write)(const struct lu_env *env,
-                                 const struct cl_io_slice *slice,
-                                 const struct cl_page_slice *page,
-                                 unsigned from, unsigned to);
-        /**
-         *
-         * \pre io->ci_type == CIT_WRITE
-         *
-         * \see vvp_io_commit_write(), lov_io_commit_write(),
-         * osc_io_commit_write().
-         */
-        int (*cio_commit_write)(const struct lu_env *env,
-                                const struct cl_io_slice *slice,
-                                const struct cl_page_slice *page,
-                                unsigned from, unsigned to);
-        /**
          * Optional debugging helper. Print given io slice.
          */
         int (*cio_print)(const struct lu_env *env, void *cookie,
@@ -3050,15 +3015,14 @@ int   cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
                            struct cl_lock_descr *descr);
 int   cl_io_read_page    (const struct lu_env *env, struct cl_io *io,
                           struct cl_page *page);
-int   cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
-                          struct cl_page *page, unsigned from, unsigned to);
-int   cl_io_commit_write (const struct lu_env *env, struct cl_io *io,
-                          struct cl_page *page, unsigned from, unsigned to);
 int   cl_io_submit_rw    (const struct lu_env *env, struct cl_io *io,
                          enum cl_req_type iot, struct cl_2queue *queue);
 int   cl_io_submit_sync  (const struct lu_env *env, struct cl_io *io,
                          enum cl_req_type iot, struct cl_2queue *queue,
                          long timeout);
+int   cl_io_commit_async (const struct lu_env *env, struct cl_io *io,
+                         struct cl_page_list *queue, int from, int to,
+                         cl_commit_cbt cb);
 void  cl_io_rw_advance   (const struct lu_env *env, struct cl_io *io,
                           size_t nob);
 int   cl_io_cancel       (const struct lu_env *env, struct cl_io *io,
@@ -3120,6 +3084,12 @@ static inline struct cl_page *cl_page_list_last(struct cl_page_list *plist)
         return cfs_list_entry(plist->pl_pages.prev, struct cl_page, cp_batch);
 }
 
+static inline struct cl_page *cl_page_list_first(struct cl_page_list *plist)
+{
+       LASSERT(plist->pl_nr > 0);
+       return cfs_list_entry(plist->pl_pages.next, struct cl_page, cp_batch);
+}
+
 /**
  * Iterate over pages in a page list.
  */
@@ -3136,6 +3106,8 @@ void cl_page_list_init   (struct cl_page_list *plist);
 void cl_page_list_add    (struct cl_page_list *plist, struct cl_page *page);
 void cl_page_list_move   (struct cl_page_list *dst, struct cl_page_list *src,
                           struct cl_page *page);
+void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
+                         struct cl_page *page);
 void cl_page_list_splice (struct cl_page_list *list,
                           struct cl_page_list *head);
 void cl_page_list_del    (const struct lu_env *env,
index 5285071..fe59be2 100644 (file)
@@ -101,11 +101,17 @@ struct ccc_io {
                 struct {
                         enum ccc_setattr_lock_type cui_local_lock;
                 } setattr;
-        } u;
-        /**
-         * True iff io is processing glimpse right now.
-         */
-        int                  cui_glimpse;
+               struct {
+                       struct cl_page_list cui_queue;
+                       unsigned long cui_written;
+                       int cui_from;
+                       int cui_to;
+               } write;
+       } u;
+       /**
+        * True iff io is processing glimpse right now.
+        */
+       int                  cui_glimpse;
        /**
         * Layout version when this IO is initialized
         */
index 8006ad2..c805abf 100644 (file)
@@ -1127,9 +1127,12 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
 {
        struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
        struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
-        struct cl_io         *io;
-        ssize_t               result;
-        ENTRY;
+       struct cl_io         *io;
+       ssize_t               result;
+       ENTRY;
+
+       CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zd\n",
+               file->f_dentry->d_name.name, iot, *ppos, count);
 
 restart:
         io = ccc_env_thread_io(env);
@@ -1152,12 +1155,11 @@ restart:
                         if ((iot == CIT_WRITE) &&
                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
                                if (mutex_lock_interruptible(&lli->
-                                                               lli_write_mutex))
-                                        GOTO(out, result = -ERESTARTSYS);
-                                write_mutex_locked = 1;
-                        } else if (iot == CIT_READ) {
-                               down_read(&lli->lli_trunc_sem);
-                        }
+                                                       lli_write_mutex))
+                                       GOTO(out, result = -ERESTARTSYS);
+                               write_mutex_locked = 1;
+                       }
+                       down_read(&lli->lli_trunc_sem);
                         break;
                 case IO_SENDFILE:
                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
@@ -1172,10 +1174,10 @@ restart:
                         LBUG();
                 }
                 result = cl_io_loop(env, io);
-                if (write_mutex_locked)
-                       mutex_unlock(&lli->lli_write_mutex);
-                else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
+               if (args->via_io_subtype == IO_NORMAL)
                        up_read(&lli->lli_trunc_sem);
+               if (write_mutex_locked)
+                       mutex_unlock(&lli->lli_write_mutex);
         } else {
                 /* cl_io_rw_init() handled IO */
                 result = io->ci_result;
@@ -1211,6 +1213,7 @@ out:
                        fd->fd_write_failed = true;
                }
        }
+       CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
 
        return result;
 }
index adeba43..d9087e1 100644 (file)
@@ -741,8 +741,6 @@ struct dentry *ll_splice_alias(struct inode *inode, struct dentry *de);
 int ll_rmdir_entry(struct inode *dir, char *name, int namelen);
 
 /* llite/rw.c */
-int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
-int ll_commit_write(struct file *, struct page *, unsigned from, unsigned to);
 int ll_writepage(struct page *page, struct writeback_control *wbc);
 int ll_writepages(struct address_space *, struct writeback_control *wbc);
 void ll_removepage(struct page *page);
@@ -755,6 +753,9 @@ int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
 int ll_readahead(const struct lu_env *env, struct cl_io *io,
                  struct ll_readahead_state *ras, struct address_space *mapping,
                  struct cl_page_list *queue, int flags);
+int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io);
+struct ll_cl_context *ll_cl_init(struct file *file, struct page *vmpage);
+void ll_cl_fini(struct ll_cl_context *lcc);
 
 /* llite/file.c */
 extern struct file_operations ll_file_operations;
@@ -1660,4 +1661,7 @@ int ll_layout_restore(struct inode *inode, loff_t start, __u64 length);
 int ll_xattr_init(void);
 void ll_xattr_fini(void);
 
+int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
+                   struct cl_page *page, enum cl_req_type crt);
+
 #endif /* LLITE_INTERNAL_H */
index bab5c65..bff9798 100644 (file)
@@ -66,7 +66,7 @@
  * Finalizes cl-data before exiting typical address_space operation. Dual to
  * ll_cl_init().
  */
-static void ll_cl_fini(struct ll_cl_context *lcc)
+void ll_cl_fini(struct ll_cl_context *lcc)
 {
         struct lu_env  *env  = lcc->lcc_env;
         struct cl_io   *io   = lcc->lcc_io;
@@ -93,8 +93,7 @@ static void ll_cl_fini(struct ll_cl_context *lcc)
  * Initializes common cl-data at the typical address_space operation entry
  * point.
  */
-static struct ll_cl_context *ll_cl_init(struct file *file,
-                                        struct page *vmpage, int create)
+struct ll_cl_context *ll_cl_init(struct file *file, struct page *vmpage)
 {
         struct ll_cl_context *lcc;
         struct lu_env    *env;
@@ -105,8 +104,8 @@ static struct ll_cl_context *ll_cl_init(struct file *file,
         int refcheck;
         int result = 0;
 
-        clob = ll_i2info(vmpage->mapping->host)->lli_clob;
-        LASSERT(clob != NULL);
+       clob = ll_i2info(file->f_dentry->d_inode)->lli_clob;
+       LASSERT(clob != NULL);
 
         env = cl_env_get(&refcheck);
         if (IS_ERR(env))
@@ -120,64 +119,17 @@ static struct ll_cl_context *ll_cl_init(struct file *file,
 
         cio = ccc_env_io(env);
         io = cio->cui_cl.cis_io;
-        if (io == NULL && create) {
-               struct inode *inode = vmpage->mapping->host;
-               loff_t pos;
-
-               if (mutex_trylock(&inode->i_mutex)) {
-                       mutex_unlock(&(inode)->i_mutex);
-
-                       /* this is too bad. Someone is trying to write the
-                        * page w/o holding inode mutex. This means we can
-                        * add dirty pages into cache during truncate */
-                       CERROR("Proc %s is dirting page w/o inode lock, this"
-                              "will break truncate.\n", current->comm);
-                       libcfs_debug_dumpstack(NULL);
-                       LBUG();
-                       return ERR_PTR(-EIO);
-               }
-
-                /*
-                 * Loop-back driver calls ->prepare_write() and ->sendfile()
-                 * methods directly, bypassing file system ->write() operation,
-                 * so cl_io has to be created here.
-                 */
-                io = ccc_env_thread_io(env);
-                ll_io_init(io, file, 1);
-
-                /* No lock at all for this kind of IO - we can't do it because
-                 * we have held page lock, it would cause deadlock.
-                 * XXX: This causes poor performance to loop device - One page
-                 *      per RPC.
-                 *      In order to get better performance, users should use
-                 *      lloop driver instead.
-                 */
-                io->ci_lockreq = CILR_NEVER;
-
-               pos = (vmpage->index << PAGE_CACHE_SHIFT);
-
-               /* Create a temp IO to serve write. */
-               result = cl_io_rw_init(env, io, CIT_WRITE,
-                                      pos, PAGE_CACHE_SIZE);
-                if (result == 0) {
-                        cio->cui_fd = LUSTRE_FPRIVATE(file);
-                        cio->cui_iov = NULL;
-                        cio->cui_nrsegs = 0;
-                        result = cl_io_iter_init(env, io);
-                        if (result == 0) {
-                                result = cl_io_lock(env, io);
-                                if (result == 0)
-                                        result = cl_io_start(env, io);
-                        }
-                } else
-                        result = io->ci_result;
-                lcc->lcc_created = 1;
-        }
-
         lcc->lcc_io = io;
-        if (io == NULL)
-                result = -EIO;
-        if (result == 0) {
+       if (io == NULL) {
+               struct inode *inode = file->f_dentry->d_inode;
+
+               CERROR("%s: " DFID " no active IO, please file a ticket.\n",
+                      ll_get_fsname(inode->i_sb, NULL, 0),
+                      PFID(ll_inode2fid(inode)));
+               libcfs_debug_dumpstack(NULL);
+               result = -EIO;
+       }
+       if (result == 0 && vmpage != NULL) {
                 struct cl_page   *page;
 
                 LASSERT(io != NULL);
@@ -197,100 +149,9 @@ static struct ll_cl_context *ll_cl_init(struct file *file,
                 lcc = ERR_PTR(result);
         }
 
-        CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %d %p %p\n",
-               vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result,
-               env, io);
-        return lcc;
-}
-
-static struct ll_cl_context *ll_cl_get(void)
-{
-        struct ll_cl_context *lcc;
-        struct lu_env *env;
-        int refcheck;
-
-        env = cl_env_get(&refcheck);
-        LASSERT(!IS_ERR(env));
-        lcc = &vvp_env_info(env)->vti_io_ctx;
-        LASSERT(env == lcc->lcc_env);
-        LASSERT(current == lcc->lcc_cookie);
-        cl_env_put(env, &refcheck);
-
-        /* env has got in ll_cl_init, so it is still usable. */
         return lcc;
 }
 
-/**
- * ->prepare_write() address space operation called by generic_file_write()
- * for every page during write.
- */
-int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from,
-                    unsigned to)
-{
-       struct ll_cl_context *lcc;
-       int result;
-       ENTRY;
-
-       lcc = ll_cl_init(file, vmpage, 1);
-       if (!IS_ERR(lcc)) {
-               struct lu_env  *env = lcc->lcc_env;
-               struct cl_io   *io  = lcc->lcc_io;
-               struct cl_page *page = lcc->lcc_page;
-
-               cl_page_assume(env, io, page);
-
-               result = cl_io_prepare_write(env, io, page, from, to);
-               if (result == 0) {
-                       /*
-                        * Add a reference, so that page is not evicted from
-                        * the cache until ->commit_write() is called.
-                        */
-                       cl_page_get(page);
-                       lu_ref_add(&page->cp_reference, "prepare_write",
-                                  current);
-               } else {
-                       cl_page_unassume(env, io, page);
-                       ll_cl_fini(lcc);
-               }
-               /* returning 0 in prepare assumes commit must be called
-                * afterwards */
-       } else {
-               result = PTR_ERR(lcc);
-       }
-       RETURN(result);
-}
-
-int ll_commit_write(struct file *file, struct page *vmpage, unsigned from,
-                   unsigned to)
-{
-       struct ll_cl_context *lcc;
-       struct lu_env    *env;
-       struct cl_io     *io;
-       struct cl_page   *page;
-       int result = 0;
-       ENTRY;
-
-       lcc  = ll_cl_get();
-       env  = lcc->lcc_env;
-       page = lcc->lcc_page;
-       io   = lcc->lcc_io;
-
-       LASSERT(cl_page_is_owned(page, io));
-       LASSERT(from <= to);
-       if (from != to) /* handle short write case. */
-               result = cl_io_commit_write(env, io, page, from, to);
-       if (cl_page_is_owned(page, io))
-               cl_page_unassume(env, io, page);
-
-       /*
-        * Release reference acquired by ll_prepare_write().
-        */
-       lu_ref_del(&page->cp_reference, "prepare_write", current);
-       cl_page_put(env, page);
-       ll_cl_fini(lcc);
-       RETURN(result);
-}
-
 struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt)
 {
         __u64 opc;
@@ -1291,7 +1152,7 @@ int ll_readpage(struct file *file, struct page *vmpage)
         int result;
         ENTRY;
 
-        lcc = ll_cl_init(file, vmpage, 0);
+        lcc = ll_cl_init(file, vmpage);
         if (!IS_ERR(lcc)) {
                 struct lu_env  *env  = lcc->lcc_env;
                 struct cl_io   *io   = lcc->lcc_io;
@@ -1314,3 +1175,27 @@ int ll_readpage(struct file *file, struct page *vmpage)
         RETURN(result);
 }
 
+int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
+                   struct cl_page *page, enum cl_req_type crt)
+{
+       struct cl_2queue  *queue;
+       int result;
+
+       LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+
+       queue = &io->ci_queue;
+       cl_2queue_init_page(queue, page);
+
+       result = cl_io_submit_sync(env, io, crt, queue, 0);
+       LASSERT(cl_page_is_owned(page, io));
+
+       if (crt == CRT_READ)
+               /*
+                * in CRT_WRITE case page is left locked even in case of
+                * error.
+                */
+               cl_page_list_disown(env, io, &queue->c2_qin);
+       cl_2queue_fini(env, queue);
+
+       return result;
+}
index 33c5996..e68d1c4 100644 (file)
@@ -496,58 +496,207 @@ out:
                mutex_unlock(&inode->i_mutex);
 
         if (tot_bytes > 0) {
-                if (rw == WRITE) {
-                       struct lov_stripe_md *lsm;
-
-                       lsm = ccc_inode_lsm_get(inode);
-                       LASSERT(lsm != NULL);
-                       lov_stripe_lock(lsm);
-                       obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0);
-                       lov_stripe_unlock(lsm);
-                       ccc_inode_lsm_put(inode, lsm);
-               }
+               struct ccc_io *cio = ccc_env_io(env);
+
+               /* no commit async for direct IO */
+               cio->u.write.cui_written += tot_bytes;
        }
 
        cl_env_put(env, &refcheck);
        RETURN(tot_bytes ? tot_bytes : result);
 }
 
+/**
+ * Prepare partially written-to page for a write.
+ */
+static int ll_prepare_partial_page(const struct lu_env *env, struct cl_io *io,
+                                  struct cl_page *pg)
+{
+       struct cl_object *obj  = io->ci_obj;
+       struct cl_attr *attr   = ccc_env_thread_attr(env);
+       loff_t          offset = cl_offset(obj, pg->cp_index);
+       int             result;
+
+       cl_object_attr_lock(obj);
+       result = cl_object_attr_get(env, obj, attr);
+       cl_object_attr_unlock(obj);
+       if (result == 0) {
+               struct ccc_page *cp;
+
+               cp = cl2ccc_page(cl_page_at(pg, &vvp_device_type));
+
+               /*
+                * If are writing to a new page, no need to read old data.
+                * The extent locking will have updated the KMS, and for our
+                * purposes here we can treat it like i_size.
+                */
+               if (attr->cat_kms <= offset) {
+                       char *kaddr = ll_kmap_atomic(cp->cpg_page, KM_USER0);
+
+                       memset(kaddr, 0, cl_page_size(obj));
+                       ll_kunmap_atomic(kaddr, KM_USER0);
+               } else if (cp->cpg_defer_uptodate)
+                       cp->cpg_ra_used = 1;
+               else
+                       result = ll_page_sync_io(env, io, pg, CRT_READ);
+       }
+       return result;
+}
+
 static int ll_write_begin(struct file *file, struct address_space *mapping,
-                         loff_t pos, unsigned len, unsigned flags,
-                         struct page **pagep, void **fsdata)
+                         loff_t pos, unsigned len, unsigned flags,
+                         struct page **pagep, void **fsdata)
 {
-        pgoff_t index = pos >> PAGE_CACHE_SHIFT;
-        struct page *page;
-        int rc;
-        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
-        ENTRY;
+       struct ll_cl_context *lcc;
+       struct lu_env  *env;
+       struct cl_io   *io;
+       struct cl_page *page;
+
+       struct cl_object *clob = ll_i2info(mapping->host)->lli_clob;
+       pgoff_t index = pos >> PAGE_CACHE_SHIFT;
+       struct page *vmpage = NULL;
+       unsigned from = pos & (PAGE_CACHE_SIZE - 1);
+       unsigned to = from + len;
+       int result = 0;
+       ENTRY;
+
+       CDEBUG(D_VFSTRACE, "Writing %lu of %d to %d bytes\n", index, from, len);
+
+       lcc = ll_cl_init(file, NULL);
+       if (IS_ERR(lcc))
+               GOTO(out, result = PTR_ERR(lcc));
+
+       env = lcc->lcc_env;
+       io  = lcc->lcc_io;
+
+       /* To avoid deadlock, try to lock page first. */
+       vmpage = grab_cache_page_nowait(mapping, index);
+       if (unlikely(vmpage == NULL || PageDirty(vmpage))) {
+               struct ccc_io *cio = ccc_env_io(env);
+               struct cl_page_list *plist = &cio->u.write.cui_queue;
+
+                /* if the page is already in dirty cache, we have to commit
+                * the pages right now; otherwise, it may cause deadlock
+                * because it holds page lock of a dirty page and request for
+                * more grants. It's okay for the dirty page to be the first
+                * one in commit page list, though. */
+               if (vmpage != NULL && PageDirty(vmpage) && plist->pl_nr > 0) {
+                       unlock_page(vmpage);
+                       page_cache_release(vmpage);
+                       vmpage = NULL;
+               }
 
-        page = grab_cache_page_write_begin(mapping, index, flags);
-        if (!page)
-                RETURN(-ENOMEM);
+               /* commit pages and then wait for page lock */
+               result = vvp_io_write_commit(env, io);
+               if (result < 0)
+                       GOTO(out, result);
 
-        *pagep = page;
+               if (vmpage == NULL) {
+                       vmpage = grab_cache_page_write_begin(mapping, index,
+                                                            flags);
+                       if (vmpage == NULL)
+                               GOTO(out, result = -ENOMEM);
+               }
+       }
 
-        rc = ll_prepare_write(file, page, from, from + len);
-        if (rc) {
-                unlock_page(page);
-                page_cache_release(page);
-        }
-        RETURN(rc);
+       page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
+       if (IS_ERR(page))
+               GOTO(out, result = PTR_ERR(page));
+
+       lcc->lcc_page = page;
+       lu_ref_add(&page->cp_reference, "cl_io", io);
+
+       cl_page_assume(env, io, page);
+       if (!PageUptodate(vmpage)) {
+               /*
+                * We're completely overwriting an existing page,
+                * so _don't_ set it up to date until commit_write
+                */
+               if (from == 0 && to == PAGE_SIZE) {
+                       CL_PAGE_HEADER(D_PAGE, env, page, "full page write\n");
+                       POISON_PAGE(vmpage, 0x11);
+               } else {
+                       /* TODO: can be optimized at OSC layer to check if it
+                        * is a lockless IO. In that case, it's not necessary
+                        * to read the data. */
+                       result = ll_prepare_partial_page(env, io, page);
+                       if (result == 0)
+                               SetPageUptodate(vmpage);
+               }
+       }
+       if (result < 0)
+               cl_page_unassume(env, io, page);
+       EXIT;
+out:
+       if (result < 0) {
+               if (vmpage != NULL) {
+                       unlock_page(vmpage);
+                       page_cache_release(vmpage);
+               }
+               if (!IS_ERR(lcc))
+                       ll_cl_fini(lcc);
+       } else {
+               *pagep = vmpage;
+               *fsdata = lcc;
+       }
+       RETURN(result);
 }
 
 static int ll_write_end(struct file *file, struct address_space *mapping,
-                        loff_t pos, unsigned len, unsigned copied,
-                        struct page *page, void *fsdata)
+                       loff_t pos, unsigned len, unsigned copied,
+                       struct page *vmpage, void *fsdata)
 {
-        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
-        int rc;
+       struct ll_cl_context *lcc = fsdata;
+       struct lu_env *env;
+       struct cl_io *io;
+       struct ccc_io *cio;
+       struct cl_page *page;
+       unsigned from = pos & (PAGE_CACHE_SIZE - 1);
+       bool unplug = false;
+       int result = 0;
+       ENTRY;
+
+       page_cache_release(vmpage);
+
+       LASSERT(lcc != NULL);
+       env  = lcc->lcc_env;
+       page = lcc->lcc_page;
+       io   = lcc->lcc_io;
+       cio  = ccc_env_io(env);
 
-        rc = ll_commit_write(file, page, from, from + copied);
-        unlock_page(page);
-        page_cache_release(page);
+       LASSERT(cl_page_is_owned(page, io));
+       if (copied > 0) {
+               struct cl_page_list *plist = &cio->u.write.cui_queue;
 
-        return rc ?: copied;
+               lcc->lcc_page = NULL; /* page will be queued */
+
+               /* Add it into write queue */
+               cl_page_list_add(plist, page);
+               if (plist->pl_nr == 1) /* first page */
+                       cio->u.write.cui_from = from;
+               else
+                       LASSERT(from == 0);
+               cio->u.write.cui_to = from + copied;
+
+               /* We may have one full RPC, commit it soon */
+               if (plist->pl_nr >= PTLRPC_MAX_BRW_PAGES)
+                       unplug = true;
+
+               CL_PAGE_DEBUG(D_VFSTRACE, env, page,
+                             "queued page: %d.\n", plist->pl_nr);
+       } else {
+               cl_page_disown(env, io, page);
+
+               /* page list is not contiguous now, commit it now */
+               unplug = true;
+       }
+
+       if (unplug ||
+           file->f_flags & O_SYNC || IS_SYNC(file->f_dentry->d_inode))
+               result = vvp_io_write_commit(env, io);
+
+       ll_cl_fini(lcc);
+       RETURN(result >= 0 ? copied : result);
 }
 
 #ifdef CONFIG_MIGRATION
@@ -582,21 +731,18 @@ struct address_space_operations ll_aops = {
 };
 #else
 struct address_space_operations_ext ll_aops = {
-        .orig_aops.readpage       = ll_readpage,
-//        .orig_aops.readpages      = ll_readpages,
-        .orig_aops.direct_IO      = ll_direct_IO_26,
-        .orig_aops.writepage      = ll_writepage,
-       .orig_aops.writepages     = ll_writepages,
-        .orig_aops.set_page_dirty = ll_set_page_dirty,
-        .orig_aops.prepare_write  = ll_prepare_write,
-        .orig_aops.commit_write   = ll_commit_write,
-        .orig_aops.invalidatepage = ll_invalidatepage,
-        .orig_aops.releasepage    = ll_releasepage,
+       .orig_aops.readpage             = ll_readpage,
+       .orig_aops.direct_IO            = ll_direct_IO_26,
+       .orig_aops.writepage            = ll_writepage,
+       .orig_aops.writepages           = ll_writepages,
+       .orig_aops.set_page_dirty       = ll_set_page_dirty,
+       .orig_aops.invalidatepage       = ll_invalidatepage,
+       .orig_aops.releasepage          = ll_releasepage,
 #ifdef CONFIG_MIGRATION
-        .orig_aops.migratepage    = ll_migratepage,
+       .orig_aops.migratepage          = ll_migratepage,
 #endif
-        .orig_aops.bmap           = NULL,
-        .write_begin    = ll_write_begin,
-        .write_end      = ll_write_end
+       .orig_aops.bmap                 = NULL,
+       .write_begin                    = ll_write_begin,
+       .write_end                      = ll_write_end
 };
 #endif
index 3abf4a7..63172d3 100644 (file)
 #include <cl_object.h>
 #include "llite_internal.h"
 
-int               vvp_io_init     (const struct lu_env *env,
-                                   struct cl_object *obj, struct cl_io *io);
-int               vvp_lock_init   (const struct lu_env *env,
-                                   struct cl_object *obj, struct cl_lock *lock,
-                                   const struct cl_io *io);
-int              vvp_page_init   (const struct lu_env *env,
-                                  struct cl_object *obj,
-                                  struct cl_page *page, struct page *vmpage);
+int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
+               struct cl_io *io);
+int vvp_lock_init(const struct lu_env *env, struct cl_object *obj,
+                 struct cl_lock *lock, const struct cl_io *io);
+int vvp_page_init(const struct lu_env *env, struct cl_object *obj,
+                 struct cl_page *page, struct page *vmpage);
 struct lu_object *vvp_object_alloc(const struct lu_env *env,
-                                   const struct lu_object_header *hdr,
-                                   struct lu_device *dev);
-
+                                  const struct lu_object_header *hdr,
+                                  struct lu_device *dev);
 struct ccc_object *cl_inode2ccc(struct inode *inode);
 
 extern struct kmem_cache *vvp_thread_kmem;
index 82b8b91..380a7e5 100644 (file)
@@ -104,6 +104,27 @@ static bool can_populate_pages(const struct lu_env *env, struct cl_io *io,
  *
  */
 
+static int vvp_io_write_iter_init(const struct lu_env *env,
+                                 const struct cl_io_slice *ios)
+{
+       struct ccc_io *cio = cl2ccc_io(env, ios);
+
+       cl_page_list_init(&cio->u.write.cui_queue);
+       cio->u.write.cui_written = 0;
+       cio->u.write.cui_from = 0;
+       cio->u.write.cui_to = PAGE_SIZE;
+
+       return 0;
+}
+
+static void vvp_io_write_iter_fini(const struct lu_env *env,
+                                  const struct cl_io_slice *ios)
+{
+       struct ccc_io *cio = cl2ccc_io(env, ios);
+
+       LASSERT(cio->u.write.cui_queue.pl_nr == 0);
+}
+
 static int vvp_io_fault_iter_init(const struct lu_env *env,
                                   const struct cl_io_slice *ios)
 {
@@ -589,6 +610,184 @@ static void vvp_io_read_fini(const struct lu_env *env, const struct cl_io_slice
        vvp_io_fini(env, ios);
 }
 
+static int vvp_io_commit_sync(const struct lu_env *env, struct cl_io *io,
+                             struct cl_page_list *plist, int from, int to)
+{
+       struct cl_2queue *queue = &io->ci_queue;
+       struct cl_page *page;
+       unsigned int bytes = 0;
+       int rc = 0;
+       ENTRY;
+
+       if (plist->pl_nr == 0)
+               RETURN(0);
+
+       if (from != 0) {
+               page = cl_page_list_first(plist);
+               cl_page_clip(env, page, from,
+                            plist->pl_nr == 1 ? to : PAGE_SIZE);
+       }
+       if (to != PAGE_SIZE && plist->pl_nr > 1) {
+               page = cl_page_list_last(plist);
+               cl_page_clip(env, page, 0, to);
+       }
+
+       cl_2queue_init(queue);
+       cl_page_list_splice(plist, &queue->c2_qin);
+       rc = cl_io_submit_sync(env, io, CRT_WRITE, queue, 0);
+
+       /* plist is not sorted any more */
+       cl_page_list_splice(&queue->c2_qin, plist);
+       cl_page_list_splice(&queue->c2_qout, plist);
+       cl_2queue_fini(env, queue);
+
+       if (rc == 0) {
+               /* calculate bytes */
+               bytes = plist->pl_nr << PAGE_SHIFT;
+               bytes -= from + PAGE_SIZE - to;
+
+               while (plist->pl_nr > 0) {
+                       page = cl_page_list_first(plist);
+                       cl_page_list_del(env, plist, page);
+
+                       cl_page_clip(env, page, 0, PAGE_SIZE);
+
+                       SetPageUptodate(cl_page_vmpage(env, page));
+                       cl_page_disown(env, io, page);
+
+                       /* held in ll_cl_init() */
+                       lu_ref_del(&page->cp_reference, "cl_io", io);
+                       cl_page_put(env, page);
+               }
+       }
+
+       RETURN(bytes > 0 ? bytes : rc);
+}
+
+static void write_commit_callback(const struct lu_env *env, struct cl_io *io,
+                               struct cl_page *page)
+{
+       const struct cl_page_slice *slice;
+       struct ccc_page *cp;
+       struct page *vmpage;
+
+       slice = cl_page_at(page, &vvp_device_type);
+       cp = cl2ccc_page(slice);
+       vmpage = cp->cpg_page;
+
+       SetPageUptodate(vmpage);
+       set_page_dirty(vmpage);
+       vvp_write_pending(cl2ccc(slice->cpl_obj), cp);
+
+       cl_page_disown(env, io, page);
+
+       /* held in ll_cl_init() */
+       lu_ref_del(&page->cp_reference, "cl_io", io);
+       cl_page_put(env, page);
+}
+
+/* make sure the page list is contiguous */
+static bool page_list_sanity_check(struct cl_page_list *plist)
+{
+       struct cl_page *page;
+       pgoff_t index = CL_PAGE_EOF;
+
+       cl_page_list_for_each(page, plist) {
+               if (index == CL_PAGE_EOF) {
+                       index = page->cp_index;
+                       continue;
+               }
+
+               ++index;
+               if (index == page->cp_index)
+                       continue;
+
+               return false;
+       }
+       return true;
+}
+
+/* Return how many bytes have queued or written */
+int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io)
+{
+       struct cl_object *obj = io->ci_obj;
+       struct inode *inode = ccc_object_inode(obj);
+       struct ccc_io *cio = ccc_env_io(env);
+       struct cl_page_list *queue = &cio->u.write.cui_queue;
+       struct cl_page *page;
+       int rc = 0;
+       int bytes = 0;
+       unsigned int npages = cio->u.write.cui_queue.pl_nr;
+       ENTRY;
+
+       if (npages == 0)
+               RETURN(0);
+
+       CDEBUG(D_VFSTRACE, "commit async pages: %d, from %d, to %d\n",
+               npages, cio->u.write.cui_from, cio->u.write.cui_to);
+
+       LASSERT(page_list_sanity_check(queue));
+
+       /* submit IO with async write */
+       rc = cl_io_commit_async(env, io, queue,
+                               cio->u.write.cui_from, cio->u.write.cui_to,
+                               write_commit_callback);
+       npages -= queue->pl_nr; /* already committed pages */
+       if (npages > 0) {
+               /* calculate how many bytes were written */
+               bytes = npages << PAGE_SHIFT;
+
+               /* first page */
+               bytes -= cio->u.write.cui_from;
+               if (queue->pl_nr == 0) /* last page */
+                       bytes -= PAGE_SIZE - cio->u.write.cui_to;
+               LASSERTF(bytes > 0, "bytes = %d, pages = %d\n", bytes, npages);
+
+               cio->u.write.cui_written += bytes;
+
+               CDEBUG(D_VFSTRACE, "Committed %d pages %d bytes, tot: %ld\n",
+                       npages, bytes, cio->u.write.cui_written);
+
+               /* the first page must have been written. */
+               cio->u.write.cui_from = 0;
+       }
+       LASSERT(page_list_sanity_check(queue));
+       LASSERT(ergo(rc == 0, queue->pl_nr == 0));
+
+       /* out of quota, try sync write */
+       if (rc == -EDQUOT && !cl_io_is_mkwrite(io)) {
+               rc = vvp_io_commit_sync(env, io, queue,
+                                       cio->u.write.cui_from,
+                                       cio->u.write.cui_to);
+               if (rc > 0) {
+                       cio->u.write.cui_written += rc;
+                       rc = 0;
+               }
+       }
+
+       /* update inode size */
+       ll_merge_lvb(env, inode);
+
+       /* Now the pages in queue were failed to commit, discard them
+        * unless they were dirtied before. */
+       while (queue->pl_nr > 0) {
+               page = cl_page_list_first(queue);
+               cl_page_list_del(env, queue, page);
+
+               if (!PageDirty(cl_page_vmpage(env, page)))
+                       cl_page_discard(env, io, page);
+
+               cl_page_disown(env, io, page);
+
+               /* held in ll_cl_init() */
+               lu_ref_del(&page->cp_reference, "cl_io", io);
+               cl_page_put(env, page);
+       }
+       cl_page_list_fini(env, queue);
+
+       RETURN(rc);
+}
+
 static int vvp_io_write_start(const struct lu_env *env,
                               const struct cl_io_slice *ios)
 {
@@ -623,9 +822,24 @@ static int vvp_io_write_start(const struct lu_env *env,
                 result = lustre_generic_file_write(file, cio, &pos);
 
        if (result > 0) {
+               result = vvp_io_write_commit(env, io);
+               if (cio->u.write.cui_written > 0) {
+                       result = cio->u.write.cui_written;
+                       io->ci_nob += result;
+
+                       CDEBUG(D_VFSTRACE, "write: nob %zd, result: %zd\n",
+                               io->ci_nob, result);
+               }
+       }
+       if (result > 0) {
+               struct ll_inode_info *lli = ll_i2info(inode);
+
+               spin_lock(&lli->lli_lock);
+               lli->lli_flags |= LLIF_DATA_MODIFIED;
+               spin_unlock(&lli->lli_lock);
+
                if (result < cnt)
                        io->ci_continue = 0;
-               io->ci_nob += result;
                ll_rw_stats_tally(ll_i2sbi(inode), current->pid,
                                  cio->cui_fd, pos, result, WRITE);
                result = 0;
@@ -669,6 +883,21 @@ static int vvp_io_kernel_fault(struct vvp_fault_io *cfio)
         return -EINVAL;
 }
 
+static void mkwrite_commit_callback(const struct lu_env *env, struct cl_io *io,
+                                   struct cl_page *page)
+{
+       const struct cl_page_slice *slice;
+       struct ccc_page *cp;
+       struct page *vmpage;
+
+       slice = cl_page_at(page, &vvp_device_type);
+       cp = cl2ccc_page(slice);
+       vmpage = cp->cpg_page;
+
+       set_page_dirty(vmpage);
+       vvp_write_pending(cl2ccc(slice->cpl_obj), cp);
+}
+
 static int vvp_io_fault_start(const struct lu_env *env,
                               const struct cl_io_slice *ios)
 {
@@ -683,7 +912,8 @@ static int vvp_io_fault_start(const struct lu_env *env,
        struct page          *vmpage  = NULL;
        struct cl_page      *page;
        loff_t               size;
-       pgoff_t              last; /* last page in a file data region */
+       pgoff_t              last_index;
+       ENTRY;
 
         if (fio->ft_executable &&
             LTIME_S(inode->i_mtime) != vio->u.fault.ft_mtime)
@@ -695,8 +925,8 @@ static int vvp_io_fault_start(const struct lu_env *env,
         offset = cl_offset(obj, fio->ft_index + 1) - 1;
         LASSERT(cl_index(obj, offset) == fio->ft_index);
         result = ccc_prep_size(env, obj, io, 0, offset + 1, NULL);
-        if (result != 0)
-                return result;
+       if (result != 0)
+               RETURN(result);
 
        /* must return locked page */
        if (fio->ft_mkwrite) {
@@ -705,7 +935,7 @@ static int vvp_io_fault_start(const struct lu_env *env,
        } else {
                result = vvp_io_kernel_fault(cfio);
                if (result != 0)
-                       return result;
+                       RETURN(result);
        }
 
        vmpage = cfio->ft_vmpage;
@@ -726,16 +956,15 @@ static int vvp_io_fault_start(const struct lu_env *env,
                 GOTO(out, result = +1);
         }
 
+       last_index = cl_index(obj, size - 1);
 
        if (fio->ft_mkwrite ) {
-               pgoff_t last_index;
                /*
                 * Capture the size while holding the lli_trunc_sem from above
                 * we want to make sure that we complete the mkwrite action
                 * while holding this lock. We need to make sure that we are
                 * not past the end of the file.
                 */
-               last_index = cl_index(obj, size - 1);
                if (last_index < fio->ft_index) {
                        CDEBUG(D_PAGE,
                                "llite: mkwrite and truncate race happened: "
@@ -756,28 +985,35 @@ static int vvp_io_fault_start(const struct lu_env *env,
                }
        }
 
-        page = cl_page_find(env, obj, fio->ft_index, vmpage, CPT_CACHEABLE);
-        if (IS_ERR(page))
-                GOTO(out, result = PTR_ERR(page));
-
-        /* if page is going to be written, we should add this page into cache
-         * earlier. */
-        if (fio->ft_mkwrite) {
-                wait_on_page_writeback(vmpage);
-                if (set_page_dirty(vmpage)) {
-                        struct ccc_page *cp;
-
-                        /* vvp_page_assume() calls wait_on_page_writeback(). */
-                        cl_page_assume(env, io, page);
+       page = cl_page_find(env, obj, fio->ft_index, vmpage, CPT_CACHEABLE);
+       if (IS_ERR(page))
+               GOTO(out, result = PTR_ERR(page));
 
-                        cp = cl2ccc_page(cl_page_at(page, &vvp_device_type));
-                        vvp_write_pending(cl2ccc(obj), cp);
-
-                        /* Do not set Dirty bit here so that in case IO is
-                         * started before the page is really made dirty, we
-                         * still have chance to detect it. */
-                        result = cl_page_cache_add(env, io, page, CRT_WRITE);
+       /* if page is going to be written, we should add this page into cache
+        * earlier. */
+       if (fio->ft_mkwrite) {
+               wait_on_page_writeback(vmpage);
+               if (!PageDirty(vmpage)) {
+                       struct cl_page_list *plist = &io->ci_queue.c2_qin;
+                       int to = PAGE_SIZE;
+
+                       /* vvp_page_assume() calls wait_on_page_writeback(). */
+                       cl_page_assume(env, io, page);
+
+                       cl_page_list_init(plist);
+                       cl_page_list_add(plist, page);
+
+                       /* size fixup */
+                       if (last_index == page->cp_index)
+                               to = size & ~CFS_PAGE_MASK;
+
+                       /* Do not set Dirty bit here so that in case IO is
+                        * started before the page is really made dirty, we
+                        * still have chance to detect it. */
+                       result = cl_io_commit_async(env, io, plist, 0, to,
+                                                   mkwrite_commit_callback);
                        LASSERT(cl_page_is_owned(page, io));
+                       cl_page_list_fini(env, plist);
 
                        vmpage = NULL;
                        if (result < 0) {
@@ -795,15 +1031,14 @@ static int vvp_io_fault_start(const struct lu_env *env,
                }
        }
 
-       last = cl_index(obj, size - 1);
        /*
         * The ft_index is only used in the case of
         * a mkwrite action. We need to check
         * our assertions are correct, since
         * we should have caught this above
         */
-       LASSERT(!fio->ft_mkwrite || fio->ft_index <= last);
-        if (fio->ft_index == last)
+       LASSERT(!fio->ft_mkwrite || fio->ft_index <= last_index);
+       if (fio->ft_index == last_index)
                 /*
                  * Last page is mapped partially.
                  */
@@ -885,237 +1120,6 @@ static int vvp_io_read_page(const struct lu_env *env,
         RETURN(0);
 }
 
-static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io,
-                            struct cl_page *page, struct ccc_page *cp,
-                            enum cl_req_type crt)
-{
-        struct cl_2queue  *queue;
-        int result;
-
-        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
-
-        queue = &io->ci_queue;
-        cl_2queue_init_page(queue, page);
-
-       result = cl_io_submit_sync(env, io, crt, queue, 0);
-        LASSERT(cl_page_is_owned(page, io));
-
-        if (crt == CRT_READ)
-                /*
-                 * in CRT_WRITE case page is left locked even in case of
-                 * error.
-                 */
-                cl_page_list_disown(env, io, &queue->c2_qin);
-        cl_2queue_fini(env, queue);
-
-        return result;
-}
-
-/**
- * Prepare partially written-to page for a write.
- */
-static int vvp_io_prepare_partial(const struct lu_env *env, struct cl_io *io,
-                                  struct cl_object *obj, struct cl_page *pg,
-                                  struct ccc_page *cp,
-                                  unsigned from, unsigned to)
-{
-        struct cl_attr *attr   = ccc_env_thread_attr(env);
-        loff_t          offset = cl_offset(obj, pg->cp_index);
-        int             result;
-
-        cl_object_attr_lock(obj);
-        result = cl_object_attr_get(env, obj, attr);
-        cl_object_attr_unlock(obj);
-        if (result == 0) {
-                /*
-                 * If are writing to a new page, no need to read old data.
-                 * The extent locking will have updated the KMS, and for our
-                 * purposes here we can treat it like i_size.
-                 */
-                if (attr->cat_kms <= offset) {
-                        char *kaddr = ll_kmap_atomic(cp->cpg_page, KM_USER0);
-
-                        memset(kaddr, 0, cl_page_size(obj));
-                        ll_kunmap_atomic(kaddr, KM_USER0);
-                } else if (cp->cpg_defer_uptodate)
-                        cp->cpg_ra_used = 1;
-                else
-                        result = vvp_page_sync_io(env, io, pg, cp, CRT_READ);
-                /*
-                 * In older implementations, obdo_refresh_inode is called here
-                 * to update the inode because the write might modify the
-                 * object info at OST. However, this has been proven useless,
-                 * since LVB functions will be called when user space program
-                 * tries to retrieve inode attribute.  Also, see bug 15909 for
-                 * details. -jay
-                 */
-                if (result == 0)
-                        cl_page_export(env, pg, 1);
-        }
-        return result;
-}
-
-static int vvp_io_prepare_write(const struct lu_env *env,
-                                const struct cl_io_slice *ios,
-                                const struct cl_page_slice *slice,
-                                unsigned from, unsigned to)
-{
-        struct cl_object *obj    = slice->cpl_obj;
-        struct ccc_page  *cp     = cl2ccc_page(slice);
-        struct cl_page   *pg     = slice->cpl_page;
-       struct page       *vmpage = cp->cpg_page;
-
-        int result;
-
-        ENTRY;
-
-        LINVRNT(cl_page_is_vmlocked(env, pg));
-        LASSERT(vmpage->mapping->host == ccc_object_inode(obj));
-
-        result = 0;
-
-        CL_PAGE_HEADER(D_PAGE, env, pg, "preparing: [%d, %d]\n", from, to);
-        if (!PageUptodate(vmpage)) {
-                /*
-                 * We're completely overwriting an existing page, so _don't_
-                 * set it up to date until commit_write
-                 */
-               if (from == 0 && to == PAGE_CACHE_SIZE) {
-                        CL_PAGE_HEADER(D_PAGE, env, pg, "full page write\n");
-                        POISON_PAGE(page, 0x11);
-                } else
-                        result = vvp_io_prepare_partial(env, ios->cis_io, obj,
-                                                        pg, cp, from, to);
-        } else
-                CL_PAGE_HEADER(D_PAGE, env, pg, "uptodate\n");
-        RETURN(result);
-}
-
-static int vvp_io_commit_write(const struct lu_env *env,
-                               const struct cl_io_slice *ios,
-                               const struct cl_page_slice *slice,
-                               unsigned from, unsigned to)
-{
-        struct cl_object  *obj    = slice->cpl_obj;
-        struct cl_io      *io     = ios->cis_io;
-        struct ccc_page   *cp     = cl2ccc_page(slice);
-        struct cl_page    *pg     = slice->cpl_page;
-        struct inode      *inode  = ccc_object_inode(obj);
-        struct ll_sb_info *sbi    = ll_i2sbi(inode);
-       struct ll_inode_info *lli = ll_i2info(inode);
-       struct page        *vmpage = cp->cpg_page;
-
-        int    result;
-        int    tallyop;
-        loff_t size;
-
-        ENTRY;
-
-        LINVRNT(cl_page_is_vmlocked(env, pg));
-        LASSERT(vmpage->mapping->host == inode);
-
-        LU_OBJECT_HEADER(D_INODE, env, &obj->co_lu, "commiting page write\n");
-        CL_PAGE_HEADER(D_PAGE, env, pg, "committing: [%d, %d]\n", from, to);
-
-        /*
-         * queue a write for some time in the future the first time we
-         * dirty the page.
-         *
-         * This is different from what other file systems do: they usually
-         * just mark page (and some of its buffers) dirty and rely on
-         * balance_dirty_pages() to start a write-back. Lustre wants write-back
-         * to be started earlier for the following reasons:
-         *
-         *     (1) with a large number of clients we need to limit the amount
-         *     of cached data on the clients a lot;
-         *
-         *     (2) large compute jobs generally want compute-only then io-only
-         *     and the IO should complete as quickly as possible;
-         *
-         *     (3) IO is batched up to the RPC size and is async until the
-         *     client max cache is hit
-         *     (/proc/fs/lustre/osc/OSC.../max_dirty_mb)
-         *
-         */
-        if (!PageDirty(vmpage)) {
-                tallyop = LPROC_LL_DIRTY_MISSES;
-                result = cl_page_cache_add(env, io, pg, CRT_WRITE);
-                if (result == 0) {
-                        /* page was added into cache successfully. */
-                        set_page_dirty(vmpage);
-                        vvp_write_pending(cl2ccc(obj), cp);
-                } else if (result == -EDQUOT) {
-                       pgoff_t last_index = i_size_read(inode) >> PAGE_CACHE_SHIFT;
-                        bool need_clip = true;
-
-                        /*
-                         * Client ran out of disk space grant. Possible
-                         * strategies are:
-                         *
-                         *     (a) do a sync write, renewing grant;
-                         *
-                         *     (b) stop writing on this stripe, switch to the
-                         *     next one.
-                         *
-                         * (b) is a part of "parallel io" design that is the
-                         * ultimate goal. (a) is what "old" client did, and
-                         * what the new code continues to do for the time
-                         * being.
-                         */
-                        if (last_index > pg->cp_index) {
-                               to = PAGE_CACHE_SIZE;
-                                need_clip = false;
-                        } else if (last_index == pg->cp_index) {
-                                int size_to = i_size_read(inode) & ~CFS_PAGE_MASK;
-                                if (to < size_to)
-                                        to = size_to;
-                        }
-                        if (need_clip)
-                                cl_page_clip(env, pg, 0, to);
-                        result = vvp_page_sync_io(env, io, pg, cp, CRT_WRITE);
-                        if (result)
-                                CERROR("Write page %lu of inode %p failed %d\n",
-                                       pg->cp_index, inode, result);
-                }
-        } else {
-                tallyop = LPROC_LL_DIRTY_HITS;
-                result = 0;
-        }
-        ll_stats_ops_tally(sbi, tallyop, 1);
-
-       /* Inode should be marked DIRTY even if no new page was marked DIRTY
-        * because page could have been not flushed between 2 modifications.
-        * It is important the file is marked DIRTY as soon as the I/O is done
-        * Indeed, when cache is flushed, file could be already closed and it
-        * is too late to warn the MDT.
-        * It is acceptable that file is marked DIRTY even if I/O is dropped
-        * for some reasons before being flushed to OST.
-        */
-       if (result == 0) {
-               spin_lock(&lli->lli_lock);
-               lli->lli_flags |= LLIF_DATA_MODIFIED;
-               spin_unlock(&lli->lli_lock);
-       }
-
-        size = cl_offset(obj, pg->cp_index) + to;
-
-       ll_inode_size_lock(inode);
-        if (result == 0) {
-                if (size > i_size_read(inode)) {
-                        cl_isize_write_nolock(inode, size);
-                        CDEBUG(D_VFSTRACE, DFID" updating i_size %lu\n",
-                               PFID(lu_object_fid(&obj->co_lu)),
-                               (unsigned long)size);
-                }
-                cl_page_export(env, pg, 1);
-        } else {
-                if (size > i_size_read(inode))
-                        cl_page_discard(env, io, pg);
-        }
-       ll_inode_size_unlock(inode);
-       RETURN(result);
-}
-
 static const struct cl_io_operations vvp_io_ops = {
         .op = {
                 [CIT_READ] = {
@@ -1125,10 +1129,12 @@ static const struct cl_io_operations vvp_io_ops = {
                         .cio_advance   = ccc_io_advance
                 },
                 [CIT_WRITE] = {
-                        .cio_fini      = vvp_io_fini,
-                        .cio_lock      = vvp_io_write_lock,
-                        .cio_start     = vvp_io_write_start,
-                        .cio_advance   = ccc_io_advance
+                       .cio_fini      = vvp_io_fini,
+                       .cio_iter_init = vvp_io_write_iter_init,
+                       .cio_iter_fini = vvp_io_write_iter_fini,
+                       .cio_lock      = vvp_io_write_lock,
+                       .cio_start     = vvp_io_write_start,
+                       .cio_advance   = ccc_io_advance
                 },
                 [CIT_SETATTR] = {
                         .cio_fini       = vvp_io_setattr_fini,
@@ -1153,8 +1159,6 @@ static const struct cl_io_operations vvp_io_ops = {
                 }
         },
         .cio_read_page     = vvp_io_read_page,
-        .cio_prepare_write = vvp_io_prepare_write,
-        .cio_commit_write  = vvp_io_commit_write
 };
 
 int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
@@ -1191,6 +1195,7 @@ int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
                         cio->cui_tot_count = count;
                         cio->cui_tot_nrsegs = 0;
                 }
+
                /* for read/write, we store the jobid in the inode, and
                 * it'll be fetched by osc when building RPC.
                 *
index a34bdcf..e37504f 100644 (file)
@@ -450,8 +450,10 @@ struct lov_thread_info {
        struct cl_lock_descr    lti_ldescr;
        struct ost_lvb          lti_lvb;
        struct cl_2queue        lti_cl2q;
+       struct cl_page_list     lti_plist;
        struct cl_lock_closure  lti_closure;
        wait_queue_t            lti_waiter;
+       struct cl_attr          lti_attr;
 };
 
 /**
index c163cfa..b6608d8 100644 (file)
@@ -556,14 +556,6 @@ static void lov_io_unlock(const struct lu_env *env,
         EXIT;
 }
 
-
-static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
-                                              struct cl_page_list *qin,
-                                              int idx, int alloc)
-{
-        return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
-}
-
 /**
  * lov implementation of cl_operations::cio_submit() method. It takes a list
  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
@@ -580,32 +572,20 @@ static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
  * lov_device::ld_mutex mutex.
  */
 static int lov_io_submit(const struct lu_env *env,
-                         const struct cl_io_slice *ios,
+                        const struct cl_io_slice *ios,
                         enum cl_req_type crt, struct cl_2queue *queue)
 {
-        struct lov_io          *lio = cl2lov_io(env, ios);
-        struct lov_object      *obj = lio->lis_object;
-        struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
-        struct cl_page_list    *qin = &queue->c2_qin;
-        struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
-        struct cl_page_list *stripes_qin = NULL;
-        struct cl_page *page;
-        struct cl_page *tmp;
-        int stripe;
-
-#define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
+       struct cl_page_list     *qin = &queue->c2_qin;
+       struct lov_io           *lio = cl2lov_io(env, ios);
+       struct lov_io_sub       *sub;
+       struct cl_page_list     *plist = &lov_env_info(env)->lti_plist;
+       struct cl_page          *page;
+       int stripe;
+       int rc = 0;
+       ENTRY;
 
-        int rc = 0;
-        int alloc =
-#if defined(__KERNEL__) && defined(__linux__)
-                !(current->flags & PF_MEMALLOC);
-#else
-                1;
-#endif
-        ENTRY;
         if (lio->lis_active_subios == 1) {
                 int idx = lio->lis_single_subio_index;
-                struct lov_io_sub *sub;
 
                 LASSERT(idx < lio->lis_nr_subios);
                 sub = lov_sub_get(env, lio, idx);
@@ -618,120 +598,121 @@ static int lov_io_submit(const struct lu_env *env,
         }
 
         LASSERT(lio->lis_subs != NULL);
-        if (alloc) {
-                OBD_ALLOC_LARGE(stripes_qin,
-                                sizeof(*stripes_qin) * lio->lis_nr_subios);
-                if (stripes_qin == NULL)
-                        RETURN(-ENOMEM);
-
-                for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
-                        cl_page_list_init(&stripes_qin[stripe]);
-        } else {
-                /*
-                 * If we get here, it means pageout & swap doesn't help.
-                 * In order to not make things worse, even don't try to
-                 * allocate the memory with __GFP_NOWARN. -jay
-                 */
-               mutex_lock(&ld->ld_mutex);
-                lio->lis_mem_frozen = 1;
-        }
 
-        cl_2queue_init(cl2q);
-        cl_page_list_for_each_safe(page, tmp, qin) {
-                stripe = lov_page_stripe(page);
-                cl_page_list_move(QIN(stripe), qin, page);
-        }
+       cl_page_list_init(plist);
+       while (qin->pl_nr > 0) {
+               struct cl_2queue  *cl2q = &lov_env_info(env)->lti_cl2q;
 
-        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
-                struct lov_io_sub   *sub;
-                struct cl_page_list *sub_qin = QIN(stripe);
+               cl_2queue_init(cl2q);
 
-                if (cfs_list_empty(&sub_qin->pl_pages))
-                        continue;
+               page = cl_page_list_first(qin);
+               cl_page_list_move(&cl2q->c2_qin, qin, page);
 
-                cl_page_list_splice(sub_qin, &cl2q->c2_qin);
-                sub = lov_sub_get(env, lio, stripe);
-                if (!IS_ERR(sub)) {
+               stripe = lov_page_stripe(page);
+               while (qin->pl_nr > 0) {
+                       page = cl_page_list_first(qin);
+                       if (stripe != lov_page_stripe(page))
+                               break;
+
+                       cl_page_list_move(&cl2q->c2_qin, qin, page);
+               }
+
+               sub = lov_sub_get(env, lio, stripe);
+               if (!IS_ERR(sub)) {
                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
                                             crt, cl2q);
-                        lov_sub_put(sub);
-                } else
-                        rc = PTR_ERR(sub);
-                cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
-                cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
-                if (rc != 0)
-                        break;
-        }
+                       lov_sub_put(sub);
+               } else {
+                       rc = PTR_ERR(sub);
+               }
 
-        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
-                struct cl_page_list *sub_qin = QIN(stripe);
+               cl_page_list_splice(&cl2q->c2_qin, plist);
+               cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
+               cl_2queue_fini(env, cl2q);
 
-                if (cfs_list_empty(&sub_qin->pl_pages))
-                        continue;
+               if (rc != 0)
+                       break;
+       }
 
-                cl_page_list_splice(sub_qin, qin);
-        }
+       cl_page_list_splice(plist, qin);
+       cl_page_list_fini(env, plist);
 
-        if (alloc) {
-                OBD_FREE_LARGE(stripes_qin,
-                         sizeof(*stripes_qin) * lio->lis_nr_subios);
-        } else {
-                int i;
+       RETURN(rc);
+}
 
-                for (i = 0; i < lio->lis_nr_subios; i++) {
-                        struct cl_io *cio = lio->lis_subs[i].sub_io;
+static int lov_io_commit_async(const struct lu_env *env,
+                              const struct cl_io_slice *ios,
+                              struct cl_page_list *queue, int from, int to,
+                              cl_commit_cbt cb)
+{
+       struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
+       struct lov_io     *lio = cl2lov_io(env, ios);
+       struct lov_io_sub *sub;
+       struct cl_page *page;
+       int rc = 0;
+       ENTRY;
 
-                        if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
-                                lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
-                }
-                lio->lis_mem_frozen = 0;
-               mutex_unlock(&ld->ld_mutex);
-        }
+       if (lio->lis_active_subios == 1) {
+               int idx = lio->lis_single_subio_index;
+
+               LASSERT(idx < lio->lis_nr_subios);
+               sub = lov_sub_get(env, lio, idx);
+               LASSERT(!IS_ERR(sub));
+               LASSERT(sub->sub_io == &lio->lis_single_subio);
+               rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
+                                       from, to, cb);
+               lov_sub_put(sub);
+               RETURN(rc);
+       }
 
-        RETURN(rc);
-#undef QIN
-}
+       LASSERT(lio->lis_subs != NULL);
 
-static int lov_io_prepare_write(const struct lu_env *env,
-                                const struct cl_io_slice *ios,
-                                const struct cl_page_slice *slice,
-                                unsigned from, unsigned to)
-{
-        struct lov_io     *lio      = cl2lov_io(env, ios);
-        struct cl_page    *sub_page = lov_sub_page(slice);
-        struct lov_io_sub *sub;
-        int result;
+       cl_page_list_init(plist);
+       while (queue->pl_nr > 0) {
+               int stripe_to = to;
+               int stripe;
 
-        ENTRY;
-        sub = lov_page_subio(env, lio, slice);
-        if (!IS_ERR(sub)) {
-                result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
-                                             sub_page, from, to);
-                lov_sub_put(sub);
-        } else
-                result = PTR_ERR(sub);
-        RETURN(result);
-}
+               LASSERT(plist->pl_nr == 0);
+               page = cl_page_list_first(queue);
+               cl_page_list_move(plist, queue, page);
 
-static int lov_io_commit_write(const struct lu_env *env,
-                               const struct cl_io_slice *ios,
-                               const struct cl_page_slice *slice,
-                               unsigned from, unsigned to)
-{
-        struct lov_io     *lio      = cl2lov_io(env, ios);
-        struct cl_page    *sub_page = lov_sub_page(slice);
-        struct lov_io_sub *sub;
-        int result;
+               stripe = lov_page_stripe(page);
+               while (queue->pl_nr > 0) {
+                       page = cl_page_list_first(queue);
+                       if (stripe != lov_page_stripe(page))
+                               break;
 
-        ENTRY;
-        sub = lov_page_subio(env, lio, slice);
-        if (!IS_ERR(sub)) {
-                result = cl_io_commit_write(sub->sub_env, sub->sub_io,
-                                            sub_page, from, to);
-                lov_sub_put(sub);
-        } else
-                result = PTR_ERR(sub);
-        RETURN(result);
+                       cl_page_list_move(plist, queue, page);
+               }
+
+               if (queue->pl_nr > 0) /* still has more pages */
+                       stripe_to = PAGE_SIZE;
+
+               sub = lov_sub_get(env, lio, stripe);
+               if (!IS_ERR(sub)) {
+                       rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
+                                               plist, from, stripe_to, cb);
+                       lov_sub_put(sub);
+               } else {
+                       rc = PTR_ERR(sub);
+                       break;
+               }
+
+               if (plist->pl_nr > 0) /* short write */
+                       break;
+
+               from = 0;
+       }
+
+       /* for error case, add the page back into the qin list */
+       LASSERT(ergo(rc == 0, plist->pl_nr == 0));
+       while (plist->pl_nr > 0) {
+               /* error occurred, add the uncommitted pages back into queue */
+               page = cl_page_list_last(plist);
+               cl_page_list_move_head(queue, plist, page);
+       }
+
+       RETURN(rc);
 }
 
 static int lov_io_fault_start(const struct lu_env *env,
@@ -819,20 +800,12 @@ static const struct cl_io_operations lov_io_ops = {
                        .cio_start     = lov_io_start,
                        .cio_end       = lov_io_fsync_end
                },
-                [CIT_MISC] = {
-                        .cio_fini   = lov_io_fini
-                }
-        },
-        .req_op = {
-                 [CRT_READ] = {
-                         .cio_submit    = lov_io_submit
-                 },
-                 [CRT_WRITE] = {
-                         .cio_submit    = lov_io_submit
-                 }
-         },
-        .cio_prepare_write = lov_io_prepare_write,
-        .cio_commit_write  = lov_io_commit_write
+               [CIT_MISC] = {
+                       .cio_fini      = lov_io_fini
+               }
+       },
+       .cio_submit                    = lov_io_submit,
+       .cio_commit_async              = lov_io_commit_async,
 };
 
 /*****************************************************************************
@@ -845,11 +818,11 @@ static void lov_empty_io_fini(const struct lu_env *env,
                               const struct cl_io_slice *ios)
 {
        struct lov_object *lov = cl2lov(ios->cis_obj);
-        ENTRY;
+       ENTRY;
 
        if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
                wake_up_all(&lov->lo_waitq);
-        EXIT;
+       EXIT;
 }
 
 static void lov_empty_impossible(const struct lu_env *env,
@@ -896,21 +869,14 @@ static const struct cl_io_operations lov_empty_io_ops = {
                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
                 },
                [CIT_FSYNC] = {
-                       .cio_fini   = lov_empty_io_fini
+                       .cio_fini      = lov_empty_io_fini
                },
-                [CIT_MISC] = {
-                        .cio_fini   = lov_empty_io_fini
-                }
-        },
-        .req_op = {
-                 [CRT_READ] = {
-                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
-                 },
-                 [CRT_WRITE] = {
-                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
-                 }
-         },
-        .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
+               [CIT_MISC] = {
+                       .cio_fini      = lov_empty_io_fini
+               }
+       },
+       .cio_submit                    = LOV_EMPTY_IMPOSSIBLE,
+       .cio_commit_async              = LOV_EMPTY_IMPOSSIBLE
 };
 
 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
index 85edd0e..0ee6b61 100644 (file)
@@ -108,30 +108,6 @@ static void lov_page_assume(const struct lu_env *env,
         lov_page_own(env, slice, io, 0);
 }
 
-static int lov_page_cache_add(const struct lu_env *env,
-                             const struct cl_page_slice *slice,
-                             struct cl_io *io)
-{
-       struct lov_io     *lio = lov_env_io(env);
-       struct lov_io_sub *sub;
-       int rc = 0;
-
-       LINVRNT(lov_page_invariant(slice));
-       LINVRNT(!cl2lov_page(slice)->lps_invalid);
-       ENTRY;
-
-       sub = lov_page_subio(env, lio, slice);
-       if (!IS_ERR(sub)) {
-               rc = cl_page_cache_add(sub->sub_env, sub->sub_io,
-                                      slice->cpl_page->cp_child, CRT_WRITE);
-               lov_sub_put(sub);
-       } else {
-               rc = PTR_ERR(sub);
-               CL_PAGE_DEBUG(D_ERROR, env, slice->cpl_page, "rc = %d\n", rc);
-       }
-       RETURN(rc);
-}
-
 static int lov_page_print(const struct lu_env *env,
                           const struct cl_page_slice *slice,
                           void *cookie, lu_printer_t printer)
@@ -142,15 +118,10 @@ static int lov_page_print(const struct lu_env *env,
 }
 
 static const struct cl_page_operations lov_page_ops = {
-        .cpo_fini   = lov_page_fini,
-        .cpo_own    = lov_page_own,
-        .cpo_assume = lov_page_assume,
-       .io = {
-               [CRT_WRITE] = {
-                       .cpo_cache_add = lov_page_cache_add
-               }
-       },
-        .cpo_print  = lov_page_print
+       .cpo_fini       = lov_page_fini,
+       .cpo_own        = lov_page_own,
+       .cpo_assume     = lov_page_assume,
+       .cpo_print      = lov_page_print
 };
 
 static void lov_empty_page_fini(const struct lu_env *env,
index b7ddaed..751ddbb 100644 (file)
@@ -813,79 +813,30 @@ int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
 EXPORT_SYMBOL(cl_io_read_page);
 
 /**
- * Called by write io to prepare page to receive data from user buffer.
+ * Commit a list of contiguous pages into writeback cache.
  *
- * \see cl_io_operations::cio_prepare_write()
+ * \returns 0 if all pages committed, or errcode if error occurred.
+ * \see cl_io_operations::cio_commit_async()
  */
-int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
-                        struct cl_page *page, unsigned from, unsigned to)
+int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
+                       struct cl_page_list *queue, int from, int to,
+                       cl_commit_cbt cb)
 {
-        const struct cl_io_slice *scan;
-        int result = 0;
-
-        LINVRNT(io->ci_type == CIT_WRITE);
-        LINVRNT(cl_page_is_owned(page, io));
-        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
-        LINVRNT(cl_io_invariant(io));
-        LASSERT(cl_page_in_io(page, io));
-        ENTRY;
-
-        cl_io_for_each_reverse(scan, io) {
-                if (scan->cis_iop->cio_prepare_write != NULL) {
-                        const struct cl_page_slice *slice;
-
-                        slice = cl_io_slice_page(scan, page);
-                        result = scan->cis_iop->cio_prepare_write(env, scan,
-                                                                  slice,
-                                                                  from, to);
-                        if (result != 0)
-                                break;
-                }
-        }
-        RETURN(result);
-}
-EXPORT_SYMBOL(cl_io_prepare_write);
-
-/**
- * Called by write io after user data were copied into a page.
- *
- * \see cl_io_operations::cio_commit_write()
- */
-int cl_io_commit_write(const struct lu_env *env, struct cl_io *io,
-                       struct cl_page *page, unsigned from, unsigned to)
-{
-        const struct cl_io_slice *scan;
-        int result = 0;
-
-        LINVRNT(io->ci_type == CIT_WRITE);
-        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
-        LINVRNT(cl_io_invariant(io));
-        /*
-         * XXX Uh... not nice. Top level cl_io_commit_write() call (vvp->lov)
-         * already called cl_page_cache_add(), moving page into CPS_CACHED
-         * state. Better (and more general) way of dealing with such situation
-         * is needed.
-         */
-        LASSERT(cl_page_is_owned(page, io) || page->cp_parent != NULL);
-        LASSERT(cl_page_in_io(page, io));
-        ENTRY;
-
-        cl_io_for_each(scan, io) {
-                if (scan->cis_iop->cio_commit_write != NULL) {
-                        const struct cl_page_slice *slice;
+       const struct cl_io_slice *scan;
+       int result = 0;
+       ENTRY;
 
-                        slice = cl_io_slice_page(scan, page);
-                        result = scan->cis_iop->cio_commit_write(env, scan,
-                                                                 slice,
-                                                                 from, to);
-                        if (result != 0)
-                                break;
-                }
-        }
-        LINVRNT(result <= 0);
-        RETURN(result);
+       cl_io_for_each(scan, io) {
+               if (scan->cis_iop->cio_commit_async == NULL)
+                       continue;
+               result = scan->cis_iop->cio_commit_async(env, scan, queue,
+                                                        from, to, cb);
+               if (result != 0)
+                       break;
+       }
+       RETURN(result);
 }
-EXPORT_SYMBOL(cl_io_commit_write);
+EXPORT_SYMBOL(cl_io_commit_async);
 
 /**
  * Submits a list of pages for immediate io.
@@ -900,25 +851,22 @@ EXPORT_SYMBOL(cl_io_commit_write);
 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
                    enum cl_req_type crt, struct cl_2queue *queue)
 {
-        const struct cl_io_slice *scan;
-        int result = 0;
-
-        LINVRNT(crt < ARRAY_SIZE(scan->cis_iop->req_op));
-        ENTRY;
+       const struct cl_io_slice *scan;
+       int result = 0;
+       ENTRY;
 
-        cl_io_for_each(scan, io) {
-                if (scan->cis_iop->req_op[crt].cio_submit == NULL)
-                        continue;
-                result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
-                                                              queue);
-                if (result != 0)
-                        break;
-        }
-        /*
-         * If ->cio_submit() failed, no pages were sent.
-         */
-        LASSERT(ergo(result != 0, cfs_list_empty(&queue->c2_qout.pl_pages)));
-        RETURN(result);
+       cl_io_for_each(scan, io) {
+               if (scan->cis_iop->cio_submit == NULL)
+                       continue;
+               result = scan->cis_iop->cio_submit(env, scan, crt, queue);
+               if (result != 0)
+                       break;
+       }
+       /*
+        * If ->cio_submit() failed, no pages were sent.
+        */
+       LASSERT(ergo(result != 0, cfs_list_empty(&queue->c2_qout.pl_pages)));
+       RETURN(result);
 }
 EXPORT_SYMBOL(cl_io_submit_rw);
 
@@ -1154,6 +1102,26 @@ void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
 EXPORT_SYMBOL(cl_page_list_move);
 
 /**
+ * Moves a page from one page list to the head of another list.
+ */
+void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
+                           struct cl_page *page)
+{
+       LASSERT(src->pl_nr > 0);
+       LINVRNT(dst->pl_owner == current);
+       LINVRNT(src->pl_owner == current);
+
+       ENTRY;
+       cfs_list_move(&page->cp_batch, &dst->pl_pages);
+       --src->pl_nr;
+       ++dst->pl_nr;
+       lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
+                       src, dst);
+       EXIT;
+}
+EXPORT_SYMBOL(cl_page_list_move_head);
+
+/**
  * splice the cl_page_list, just as list head does
  */
 void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
index 0f02723..dc4037b 100644 (file)
@@ -1106,46 +1106,6 @@ int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
 EXPORT_SYMBOL(cl_page_make_ready);
 
 /**
- * Notify layers that high level io decided to place this page into a cache
- * for future transfer.
- *
- * The layer implementing transfer engine (osc) has to register this page in
- * its queues.
- *
- * \pre  cl_page_is_owned(pg, io)
- * \post cl_page_is_owned(pg, io)
- *
- * \see cl_page_operations::cpo_cache_add()
- */
-int cl_page_cache_add(const struct lu_env *env, struct cl_io *io,
-                      struct cl_page *pg, enum cl_req_type crt)
-{
-       const struct cl_page_slice *scan;
-       int result = 0;
-
-       PINVRNT(env, pg, crt < CRT_NR);
-       PINVRNT(env, pg, cl_page_is_owned(pg, io));
-       PINVRNT(env, pg, cl_page_invariant(pg));
-
-       ENTRY;
-
-       if (crt >= CRT_NR)
-               RETURN(-EINVAL);
-
-       cfs_list_for_each_entry(scan, &pg->cp_layers, cpl_linkage) {
-               if (scan->cpl_ops->io[crt].cpo_cache_add == NULL)
-                       continue;
-
-               result = scan->cpl_ops->io[crt].cpo_cache_add(env, scan, io);
-               if (result != 0)
-                       break;
-       }
-       CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
-       RETURN(result);
-}
-EXPORT_SYMBOL(cl_page_cache_add);
-
-/**
  * Called if a pge is being written back by kernel's intention.
  *
  * \pre  cl_page_is_owned(pg, io)
index f84a516..a4bef14 100644 (file)
@@ -1301,22 +1301,17 @@ static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
         RETURN(rc);
 }
 
-static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
-                             enum cl_req_type unused, struct cl_2queue *queue)
+static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
+                               struct cl_page *page)
 {
-        struct cl_page *clp;
-        struct cl_page *temp;
-        int result = 0;
-        ENTRY;
+       struct echo_thread_info *info;
+       struct cl_2queue        *queue;
 
-        cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
-                int rc;
-                rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
-                if (rc == 0)
-                        continue;
-                result = result ?: rc;
-        }
-        RETURN(result);
+       info = echo_env_info(env);
+       LASSERT(io == &info->eti_io);
+
+       queue = &info->eti_queue;
+       cl_page_list_add(&queue->c2_qout, page);
 }
 
 static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
@@ -1394,8 +1389,10 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
 
                 async = async && (typ == CRT_WRITE);
                 if (async)
-                        rc = cl_echo_async_brw(env, io, typ, queue);
-                else
+                       rc = cl_io_commit_async(env, io, &queue->c2_qin,
+                                               0, PAGE_SIZE,
+                                               echo_commit_callback);
+               else
                        rc = cl_io_submit_sync(env, io, typ, queue, 0);
                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
                        async ? "async" : "sync", rc);
index a7697f3..3dbb375 100644 (file)
@@ -834,9 +834,9 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
-               int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
-               int count = oap->oap_count + (offset & (blocksize - 1));
-               int end = (offset + oap->oap_count) & (blocksize - 1);
+               int offset = last_off & ~CFS_PAGE_MASK;
+               int count = last_count + (offset & (blocksize - 1));
+               int end = (offset + last_count) & (blocksize - 1);
                if (end)
                        count += blocksize - end;
 
@@ -3179,14 +3179,13 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
        struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
 
        LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
-       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
-                     !PageWriteback(cl_page_vmpage(env, page))));
-       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
-                     !PageDirty(cl_page_vmpage(env, page))));
 
        /* page is top page. */
        info->oti_next_index = osc_index(ops) + 1;
        if (cl_page_own(env, io, page) == 0) {
+               KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+                             !PageDirty(cl_page_vmpage(env, page))));
+
                /* discard the page */
                cl_page_discard(env, io, page);
                cl_page_disown(env, io, page);
index 55e159b..f23f927 100644 (file)
@@ -455,6 +455,8 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
                        struct page *page, loff_t offset);
 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                       struct osc_page *ops);
+int osc_page_cache_add(const struct lu_env *env,
+                      const struct cl_page_slice *slice, struct cl_io *io);
 int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
                            struct osc_page *ops);
 int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
index e6f8f3f..5c4e785 100644 (file)
@@ -77,6 +77,12 @@ struct osc_async_page {
 #define oap_count       oap_brw_page.count
 #define oap_brw_flags   oap_brw_page.flag
 
+static inline struct osc_async_page *brw_page2oap(struct brw_page *pga)
+{
+       return (struct osc_async_page *)container_of(pga, struct osc_async_page,
+                                                    oap_brw_page);
+}
+
 struct osc_cache_waiter {
        cfs_list_t              ocw_entry;
        wait_queue_head_t             ocw_waitq;
index a16fb81..a9bad93 100644 (file)
@@ -187,6 +187,13 @@ static int osc_io_submit(const struct lu_env *env,
        return qout->pl_nr > 0 ? 0 : result;
 }
 
+/**
+ * This is called when a page is accessed within file in a way that creates
+ * new page, if one were missing (i.e., if there were a hole at that place in
+ * the file, or accessed page is beyond the current file size).
+ *
+ * Expand stripe KMS if necessary.
+ */
 static void osc_page_touch_at(const struct lu_env *env,
                               struct cl_object *obj, pgoff_t idx, unsigned to)
 {
@@ -210,7 +217,8 @@ static void osc_page_touch_at(const struct lu_env *env,
                kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
                loi->loi_lvb.lvb_size);
 
-        valid = 0;
+       attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
+       valid = CAT_MTIME | CAT_CTIME;
         if (kms > loi->loi_kms) {
                 attr->cat_kms = kms;
                 valid |= CAT_KMS;
@@ -223,92 +231,82 @@ static void osc_page_touch_at(const struct lu_env *env,
         cl_object_attr_unlock(obj);
 }
 
-/**
- * This is called when a page is accessed within file in a way that creates
- * new page, if one were missing (i.e., if there were a hole at that place in
- * the file, or accessed page is beyond the current file size). Examples:
- * ->commit_write() and ->nopage() methods.
- *
- * Expand stripe KMS if necessary.
- */
-static void osc_page_touch(const struct lu_env *env,
-                           struct osc_page *opage, unsigned to)
+static int osc_io_commit_async(const struct lu_env *env,
+                               const struct cl_io_slice *ios,
+                               struct cl_page_list *qin, int from, int to,
+                               cl_commit_cbt cb)
 {
-        struct cl_page    *page = opage->ops_cl.cpl_page;
-        struct cl_object  *obj  = opage->ops_cl.cpl_obj;
+       struct cl_io    *io = ios->cis_io;
+       struct osc_io   *oio = cl2osc_io(env, ios);
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct cl_page  *page;
+       struct cl_page  *last_page;
+       struct osc_page *opg;
+       int result = 0;
+       ENTRY;
 
-        osc_page_touch_at(env, obj, page->cp_index, to);
-}
+       LASSERT(qin->pl_nr > 0);
 
-/**
- * Implements cl_io_operations::cio_prepare_write() method for osc layer.
- *
- * \retval -EIO transfer initiated against this osc will most likely fail
- * \retval 0    transfer initiated against this osc will most likely succeed.
- *
- * The reason for this check is to immediately return an error to the caller
- * in the case of a deactivated import. Note, that import can be deactivated
- * later, while pages, dirtied by this IO, are still in the cache, but this is
- * irrelevant, because that would still return an error to the application (if
- * it does fsync), but many applications don't do fsync because of performance
- * issues, and we wanted to return an -EIO at write time to notify the
- * application.
- */
-static int osc_io_prepare_write(const struct lu_env *env,
-                                const struct cl_io_slice *ios,
-                                const struct cl_page_slice *slice,
-                                unsigned from, unsigned to)
-{
-        struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev);
-        struct obd_import *imp = class_exp2cliimp(dev->od_exp);
-        struct osc_io     *oio = cl2osc_io(env, ios);
-        int result = 0;
-        ENTRY;
+       /* Handle partial page cases */
+       last_page = cl_page_list_last(qin);
+       if (oio->oi_lockless) {
+               page = cl_page_list_first(qin);
+               if (page == last_page) {
+                       cl_page_clip(env, page, from, to);
+               } else {
+                       if (from != 0)
+                               cl_page_clip(env, page, from, PAGE_SIZE);
+                       if (to != PAGE_SIZE)
+                               cl_page_clip(env, last_page, 0, to);
+               }
+       }
 
-        /*
-         * This implements OBD_BRW_CHECK logic from old client.
-         */
+       /*
+        * NOTE: here @page is a top-level page. This is done to avoid
+        * creation of sub-page-list.
+        */
+       while (qin->pl_nr > 0) {
+               struct osc_async_page *oap;
 
-        if (imp == NULL || imp->imp_invalid)
-                result = -EIO;
-        if (result == 0 && oio->oi_lockless)
-                /* this page contains `invalid' data, but who cares?
-                 * nobody can access the invalid data.
-                 * in osc_io_commit_write(), we're going to write exact
-                 * [from, to) bytes of this page to OST. -jay */
-                cl_page_export(env, slice->cpl_page, 1);
+               page = cl_page_list_first(qin);
+               opg = osc_cl_page_osc(page);
+               oap = &opg->ops_oap;
 
-        RETURN(result);
-}
+               if (!cfs_list_empty(&oap->oap_rpc_item)) {
+                       CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
+                              oap, opg);
+                       result = -EBUSY;
+                       break;
+               }
 
-static int osc_io_commit_write(const struct lu_env *env,
-                               const struct cl_io_slice *ios,
-                               const struct cl_page_slice *slice,
-                               unsigned from, unsigned to)
-{
-        struct osc_io         *oio = cl2osc_io(env, ios);
-        struct osc_page       *opg = cl2osc_page(slice);
-        struct osc_object     *obj = cl2osc(opg->ops_cl.cpl_obj);
-        struct osc_async_page *oap = &opg->ops_oap;
-        ENTRY;
+               /* The page may be already in dirty cache. */
+               if (cfs_list_empty(&oap->oap_pending_item)) {
+                       result = osc_page_cache_add(env, &opg->ops_cl, io);
+                       if (result != 0)
+                               break;
+               }
 
-        LASSERT(to > 0);
-        /*
-         * XXX instead of calling osc_page_touch() here and in
-         * osc_io_fault_start() it might be more logical to introduce
-         * cl_page_touch() method, that generic cl_io_commit_write() and page
-         * fault code calls.
-         */
-        osc_page_touch(env, cl2osc_page(slice), to);
-        if (!client_is_remote(osc_export(obj)) &&
-            cfs_capable(CFS_CAP_SYS_RESOURCE))
-                oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
+               osc_page_touch_at(env, osc2cl(osc),
+                                 opg->ops_cl.cpl_page->cp_index,
+                                 page == last_page ? to : PAGE_SIZE);
 
-        if (oio->oi_lockless)
-                /* see osc_io_prepare_write() for lockless io handling. */
-                cl_page_clip(env, slice->cpl_page, from, to);
+               cl_page_list_del(env, qin, page);
 
-        RETURN(0);
+               (*cb)(env, io, page);
+               /* Can't access page any more. Page can be in transfer and
+                * complete at any time. */
+       }
+
+       /* for sync write, kernel will wait for this page to be flushed before
+        * osc_io_end() is called, so release it earlier.
+        * for mkwrite(), it's known there is no further pages. */
+       if (cl_io_is_sync_write(io) && oio->oi_active != NULL) {
+               osc_extent_release(env, oio->oi_active);
+               oio->oi_active = NULL;
+       }
+
+       CDEBUG(D_INFO, "%d %d\n", qin->pl_nr, result);
+       RETURN(result);
 }
 
 static int osc_io_rw_iter_init(const struct lu_env *env,
@@ -717,23 +715,23 @@ static void osc_io_end(const struct lu_env *env,
 }
 
 static const struct cl_io_operations osc_io_ops = {
-        .op = {
-                [CIT_READ] = {
-                        .cio_start  = osc_io_read_start,
-                        .cio_fini   = osc_io_fini
-                },
-                [CIT_WRITE] = {
+       .op = {
+               [CIT_READ] = {
+                       .cio_start  = osc_io_read_start,
+                       .cio_fini   = osc_io_fini
+               },
+               [CIT_WRITE] = {
                        .cio_iter_init = osc_io_rw_iter_init,
                        .cio_iter_fini = osc_io_rw_iter_fini,
-                        .cio_start  = osc_io_write_start,
+                       .cio_start  = osc_io_write_start,
                        .cio_end    = osc_io_end,
-                        .cio_fini   = osc_io_fini
-                },
-                [CIT_SETATTR] = {
-                        .cio_start  = osc_io_setattr_start,
-                        .cio_end    = osc_io_setattr_end
-                },
-                [CIT_FAULT] = {
+                       .cio_fini   = osc_io_fini
+               },
+               [CIT_SETATTR] = {
+                       .cio_start  = osc_io_setattr_start,
+                       .cio_end    = osc_io_setattr_end
+               },
+               [CIT_FAULT] = {
                        .cio_start  = osc_io_fault_start,
                        .cio_end    = osc_io_end,
                        .cio_fini   = osc_io_fini
@@ -743,20 +741,12 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_end    = osc_io_fsync_end,
                        .cio_fini   = osc_io_fini
                },
-                [CIT_MISC] = {
-                        .cio_fini   = osc_io_fini
-                }
-        },
-        .req_op = {
-                 [CRT_READ] = {
-                         .cio_submit    = osc_io_submit
-                 },
-                 [CRT_WRITE] = {
-                         .cio_submit    = osc_io_submit
-                 }
-         },
-        .cio_prepare_write = osc_io_prepare_write,
-        .cio_commit_write  = osc_io_commit_write
+               [CIT_MISC] = {
+                       .cio_fini   = osc_io_fini
+               }
+       },
+       .cio_submit                 = osc_io_submit,
+       .cio_commit_async           = osc_io_commit_async
 };
 
 /*****************************************************************************
index a673b91..2b405f4 100644 (file)
@@ -182,15 +182,15 @@ static void osc_page_transfer_get(struct osc_page *opg, const char *label)
 }
 
 static void osc_page_transfer_put(const struct lu_env *env,
-                                  struct osc_page *opg)
+                                 struct osc_page *opg)
 {
-        struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
+       struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
 
-        if (opg->ops_transfer_pinned) {
-                lu_ref_del(&page->cp_reference, "transfer", page);
-                opg->ops_transfer_pinned = 0;
-                cl_page_put(env, page);
-        }
+       if (opg->ops_transfer_pinned) {
+               opg->ops_transfer_pinned = 0;
+               lu_ref_del(&page->cp_reference, "transfer", page);
+               cl_page_put(env, page);
+       }
 }
 
 /**
@@ -213,11 +213,9 @@ static void osc_page_transfer_add(const struct lu_env *env,
        spin_unlock(&obj->oo_seatbelt);
 }
 
-static int osc_page_cache_add(const struct lu_env *env,
-                             const struct cl_page_slice *slice,
-                             struct cl_io *io)
+int osc_page_cache_add(const struct lu_env *env,
+                       const struct cl_page_slice *slice, struct cl_io *io)
 {
-       struct osc_io   *oio = osc_env_io(env);
        struct osc_page *opg = cl2osc_page(slice);
        int result;
        ENTRY;
@@ -231,16 +229,6 @@ static int osc_page_cache_add(const struct lu_env *env,
        else
                osc_page_transfer_add(env, opg, CRT_WRITE);
 
-       /* for sync write, kernel will wait for this page to be flushed before
-        * osc_io_end() is called, so release it earlier.
-        * for mkwrite(), it's known there is no further pages. */
-       if (cl_io_is_sync_write(io) || cl_io_is_mkwrite(io)) {
-               if (oio->oi_active != NULL) {
-                       osc_extent_release(env, oio->oi_active);
-                       oio->oi_active = NULL;
-               }
-       }
-
        RETURN(result);
 }
 
@@ -332,18 +320,6 @@ static void osc_page_completion_write(const struct lu_env *env,
 {
 }
 
-static int osc_page_fail(const struct lu_env *env,
-                         const struct cl_page_slice *slice,
-                         struct cl_io *unused)
-{
-        /*
-         * Cached read?
-         */
-        LBUG();
-        return 0;
-}
-
-
 static const char *osc_list(cfs_list_t *head)
 {
         return cfs_list_empty(head) ? "-" : "+";
@@ -495,18 +471,16 @@ static int osc_page_flush(const struct lu_env *env,
 }
 
 static const struct cl_page_operations osc_page_ops = {
-        .cpo_fini          = osc_page_fini,
-        .cpo_print         = osc_page_print,
-        .cpo_delete        = osc_page_delete,
-        .cpo_is_under_lock = osc_page_is_under_lock,
-        .cpo_disown        = osc_page_disown,
-        .io = {
-                [CRT_READ] = {
-                        .cpo_cache_add  = osc_page_fail,
-                        .cpo_completion = osc_page_completion_read
-                },
-                [CRT_WRITE] = {
-                       .cpo_cache_add  = osc_page_cache_add,
+       .cpo_fini          = osc_page_fini,
+       .cpo_print         = osc_page_print,
+       .cpo_delete        = osc_page_delete,
+       .cpo_is_under_lock = osc_page_is_under_lock,
+       .cpo_disown        = osc_page_disown,
+       .io = {
+               [CRT_READ] = {
+                       .cpo_completion = osc_page_completion_read
+               },
+               [CRT_WRITE] = {
                        .cpo_completion = osc_page_completion_write
                }
        },
index d3a7247..b40c456 100644 (file)
@@ -1957,7 +1957,6 @@ static int brw_interpret(const struct lu_env *env,
        struct osc_brw_async_args *aa = data;
        struct osc_extent *ext;
        struct osc_extent *tmp;
-       struct cl_object  *obj = NULL;
        struct client_obd *cli = aa->aa_cli;
         ENTRY;
 
@@ -1993,24 +1992,17 @@ static int brw_interpret(const struct lu_env *env,
                 aa->aa_ocapa = NULL;
         }
 
-       cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
-               if (obj == NULL && rc == 0) {
-                       obj = osc2cl(ext->oe_obj);
-                       cl_object_get(obj);
-               }
-
-               cfs_list_del_init(&ext->oe_link);
-               osc_extent_finish(env, ext, 1, rc);
-       }
-       LASSERT(cfs_list_empty(&aa->aa_exts));
-       LASSERT(cfs_list_empty(&aa->aa_oaps));
-
-       if (obj != NULL) {
+       if (rc == 0) {
                struct obdo *oa = aa->aa_oa;
-               struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
+               struct cl_attr *attr = &osc_env_info(env)->oti_attr;
                unsigned long valid = 0;
+               struct cl_object *obj;
+               struct osc_async_page *last;
 
-               LASSERT(rc == 0);
+               last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
+               obj = osc2cl(last->oap_obj);
+
+               cl_object_attr_lock(obj);
                if (oa->o_valid & OBD_MD_FLBLOCKS) {
                        attr->cat_blocks = oa->o_blocks;
                        valid |= CAT_BLOCKS;
@@ -2027,15 +2019,38 @@ static int brw_interpret(const struct lu_env *env,
                        attr->cat_ctime = oa->o_ctime;
                        valid |= CAT_CTIME;
                }
-               if (valid != 0) {
-                       cl_object_attr_lock(obj);
-                       cl_object_attr_set(env, obj, attr, valid);
-                       cl_object_attr_unlock(obj);
+
+               if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+                       struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+                       loff_t last_off = last->oap_count + last->oap_obj_off;
+
+                       /* Change file size if this is an out of quota or
+                        * direct IO write and it extends the file size */
+                       if (loi->loi_lvb.lvb_size < last_off) {
+                               attr->cat_size = last_off;
+                               valid |= CAT_SIZE;
+                       }
+                       /* Extend KMS if it's not a lockless write */
+                       if (loi->loi_kms < last_off &&
+                           oap2osc_page(last)->ops_srvlock == 0) {
+                               attr->cat_kms = last_off;
+                               valid |= CAT_KMS;
+                       }
                }
-               cl_object_put(env, obj);
+
+               if (valid != 0)
+                       cl_object_attr_set(env, obj, attr, valid);
+               cl_object_attr_unlock(obj);
        }
        OBDO_FREE(aa->aa_oa);
 
+       cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
+               cfs_list_del_init(&ext->oe_link);
+               osc_extent_finish(env, ext, 1, rc);
+       }
+       LASSERT(cfs_list_empty(&aa->aa_exts));
+       LASSERT(cfs_list_empty(&aa->aa_oaps));
+
        cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
                          req->rq_bulk->bd_nob_transferred);
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);