Whamcloud - gitweb
LU-12593 osd: up i_append_sem during errors
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_io.c
index c6a09d3..64b995d 100644 (file)
@@ -652,51 +652,60 @@ static int osd_map_remote_to_local(loff_t offset, ssize_t len, int *nrpages,
 }
 
 static struct page *osd_get_page(const struct lu_env *env, struct dt_object *dt,
-                                loff_t offset, gfp_t gfp_mask)
+                                loff_t offset, gfp_t gfp_mask, bool cache)
 {
        struct osd_thread_info *oti = osd_oti_get(env);
        struct inode *inode = osd_dt_obj(dt)->oo_inode;
        struct osd_device *d = osd_obj2dev(osd_dt_obj(dt));
        struct page *page;
-       int cur = oti->oti_dio_pages_used;
+       int cur;
 
         LASSERT(inode);
 
-       if (osd_use_page_cache(d)) {
+       if (cache) {
                page = find_or_create_page(inode->i_mapping,
-                                          offset >> PAGE_SHIFT,
-                                          gfp_mask);
+                                          offset >> PAGE_SHIFT, gfp_mask);
 
                if (likely(page))
                        LASSERT(!test_bit(PG_private_2, &page->flags));
                else
                        lprocfs_counter_add(d->od_stats, LPROC_OSD_NO_PAGE, 1);
-       } else {
 
-               LASSERT(oti->oti_dio_pages);
+               return page;
+       }
 
-               if (unlikely(!oti->oti_dio_pages[cur])) {
-                       LASSERT(cur < PTLRPC_MAX_BRW_PAGES);
-                       page = alloc_page(gfp_mask);
-                       if (!page)
-                               return NULL;
-                       oti->oti_dio_pages[cur] = page;
-               }
+       if (inode->i_mapping->nrpages) {
+               /* consult with pagecache, but do not create new pages */
+               /* this is normally used once */
+               page = find_lock_page(inode->i_mapping, offset >> PAGE_SHIFT);
+               if (page)
+                       return page;
+       }
 
-               page = oti->oti_dio_pages[cur];
-               LASSERT(!test_bit(PG_private_2, &page->flags));
-               set_bit(PG_private_2, &page->flags);
-               oti->oti_dio_pages_used++;
+       LASSERT(oti->oti_dio_pages);
+       cur = oti->oti_dio_pages_used;
 
-               LASSERT(!PageLocked(page));
-               lock_page(page);
+       if (unlikely(!oti->oti_dio_pages[cur])) {
+               LASSERT(cur < PTLRPC_MAX_BRW_PAGES);
+               page = alloc_page(gfp_mask);
+               if (!page)
+                       return NULL;
+               oti->oti_dio_pages[cur] = page;
+       }
 
-               LASSERT(!page->mapping);
-               LASSERT(!PageWriteback(page));
-               ClearPageUptodate(page);
+       page = oti->oti_dio_pages[cur];
+       LASSERT(!test_bit(PG_private_2, &page->flags));
+       set_bit(PG_private_2, &page->flags);
+       oti->oti_dio_pages_used++;
 
-               page->index = offset >> PAGE_SHIFT;
-       }
+       LASSERT(!PageLocked(page));
+       lock_page(page);
+
+       LASSERT(!page->mapping);
+       LASSERT(!PageWriteback(page));
+       ClearPageUptodate(page);
+
+       page->index = offset >> PAGE_SHIFT;
 
        return page;
 }
@@ -804,30 +813,70 @@ static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
 {
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object *obj = osd_dt_obj(dt);
-       int npages, i, rc = 0;
+       struct osd_device *osd   = osd_obj2dev(obj);
+       int npages, i, iosize, rc = 0;
+       bool cache, write;
+       loff_t fsize;
        gfp_t gfp_mask;
 
        LASSERT(obj->oo_inode);
 
-       if (!osd_use_page_cache(osd_obj2dev(obj))) {
-               if (unlikely(!oti->oti_dio_pages)) {
-                       OBD_ALLOC(oti->oti_dio_pages,
-                                 sizeof(struct page *) * PTLRPC_MAX_BRW_PAGES);
-                       if (!oti->oti_dio_pages)
-                               return -ENOMEM;
-               }
-       }
-
        rc = osd_map_remote_to_local(pos, len, &npages, lnb, maxlnb);
        if (rc)
                RETURN(rc);
 
+       write = rw & DT_BUFS_TYPE_WRITE;
+
+       fsize = lnb[npages - 1].lnb_file_offset + lnb[npages - 1].lnb_len;
+       iosize = fsize - lnb[0].lnb_file_offset;
+       fsize = max(fsize, i_size_read(obj->oo_inode));
+
+       cache = rw & DT_BUFS_TYPE_READAHEAD;
+       if (cache)
+               goto bypass_checks;
+
+       cache = osd_use_page_cache(osd);
+       while (cache) {
+               if (write) {
+                       if (!osd->od_writethrough_cache) {
+                               cache = false;
+                               break;
+                       }
+                       if (iosize > osd->od_writethrough_max_iosize) {
+                               cache = false;
+                               break;
+                       }
+               } else {
+                       if (!osd->od_read_cache) {
+                               cache = false;
+                               break;
+                       }
+                       if (iosize > osd->od_readcache_max_iosize) {
+                               cache = false;
+                               break;
+                       }
+               }
+               /* don't use cache on large files */
+               if (osd->od_readcache_max_filesize &&
+                   fsize > osd->od_readcache_max_filesize)
+                       cache = false;
+               break;
+       }
+
+bypass_checks:
+       if (!cache && unlikely(!oti->oti_dio_pages)) {
+               OBD_ALLOC(oti->oti_dio_pages,
+                         sizeof(struct page *) * PTLRPC_MAX_BRW_PAGES);
+               if (!oti->oti_dio_pages)
+                       return -ENOMEM;
+       }
+
        /* this could also try less hard for DT_BUFS_TYPE_READAHEAD pages */
        gfp_mask = rw & DT_BUFS_TYPE_LOCAL ? (GFP_NOFS | __GFP_HIGHMEM) :
                                             GFP_HIGHUSER;
        for (i = 0; i < npages; i++, lnb++) {
                lnb->lnb_page = osd_get_page(env, dt, lnb->lnb_file_offset,
-                                            gfp_mask);
+                                            gfp_mask, cache);
                if (lnb->lnb_page == NULL)
                        GOTO(cleanup, rc = -ENOMEM);
 
@@ -838,6 +887,17 @@ static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
                lu_object_get(&dt->do_lu);
        }
 
+#if 0
+       /* XXX: this version doesn't invalidate cached pages, but use them */
+       if (!cache && write && obj->oo_inode->i_mapping->nrpages) {
+               /* do not allow data aliasing, invalidate pagecache */
+               /* XXX: can be quite expensive in mixed case */
+               invalidate_mapping_pages(obj->oo_inode->i_mapping,
+                               lnb[0].lnb_file_offset >> PAGE_SHIFT,
+                               lnb[npages - 1].lnb_file_offset >> PAGE_SHIFT);
+       }
+#endif
+
        RETURN(i);
 
 cleanup:
@@ -939,14 +999,11 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
         struct inode           *inode = osd_dt_obj(dt)->oo_inode;
         struct osd_device      *osd   = osd_obj2dev(osd_dt_obj(dt));
-       ktime_t start;
-       ktime_t end;
+       ktime_t start, end;
        s64 timediff;
-        ssize_t                 isize;
-        __s64                   maxidx;
-        int                     rc = 0;
-        int                     i;
-        int                     cache = 0;
+       ssize_t isize;
+       __s64  maxidx;
+       int i, rc = 0;
 
         LASSERT(inode);
 
@@ -957,18 +1014,9 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
        isize = i_size_read(inode);
        maxidx = ((isize + PAGE_SIZE - 1) >> PAGE_SHIFT) - 1;
 
-        if (osd->od_writethrough_cache)
-                cache = 1;
-        if (isize > osd->od_readcache_max_filesize)
-                cache = 0;
-
        start = ktime_get();
        for (i = 0; i < npages; i++) {
 
-               if (cache == 0)
-                       generic_error_remove_page(inode->i_mapping,
-                                                 lnb[i].lnb_page);
-
                /*
                 * till commit the content of the page is undefined
                 * we'll set it uptodate once bulk is done. otherwise
@@ -1078,10 +1126,10 @@ static int osd_declare_write_commit(const struct lu_env *env,
        int                     i;
        int                     newblocks;
        int                     rc = 0;
-       int                     flags = 0;
        int                     credits = 0;
        long long               quota_space = 0;
        struct osd_fextent      extent = { 0 };
+       enum osd_quota_local_flags local_flags = 0;
        enum osd_qid_declare_flags declare_flags = OSD_QID_BLK;
        ENTRY;
 
@@ -1166,16 +1214,16 @@ static int osd_declare_write_commit(const struct lu_env *env,
 
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
                                   i_projid_read(inode), quota_space, oh,
-                                  osd_dt_obj(dt), &flags, declare_flags);
+                                  osd_dt_obj(dt), &local_flags, declare_flags);
 
        /* we need only to store the overquota flags in the first lnb for
         * now, once we support multiple objects BRW, this code needs be
         * revised. */
-       if (flags & QUOTA_FL_OVER_USRQUOTA)
+       if (local_flags & QUOTA_FL_OVER_USRQUOTA)
                lnb[0].lnb_flags |= OBD_BRW_OVER_USRQUOTA;
-       if (flags & QUOTA_FL_OVER_GRPQUOTA)
+       if (local_flags & QUOTA_FL_OVER_GRPQUOTA)
                lnb[0].lnb_flags |= OBD_BRW_OVER_GRPQUOTA;
-       if (flags & QUOTA_FL_OVER_PRJQUOTA)
+       if (local_flags & QUOTA_FL_OVER_PRJQUOTA)
                lnb[0].lnb_flags |= OBD_BRW_OVER_PRJQUOTA;
 
        if (rc == 0)
@@ -1294,7 +1342,7 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
         struct osd_iobuf *iobuf = &oti->oti_iobuf;
         struct inode *inode = osd_dt_obj(dt)->oo_inode;
         struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
-       int rc = 0, i, cache = 0, cache_hits = 0, cache_misses = 0;
+       int rc = 0, i, cache_hits = 0, cache_misses = 0;
        ktime_t start, end;
        s64 timediff;
        loff_t isize;
@@ -1307,11 +1355,6 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
 
        isize = i_size_read(inode);
 
-       if (osd->od_read_cache)
-               cache = 1;
-       if (isize > osd->od_readcache_max_filesize)
-               cache = 0;
-
        start = ktime_get();
        for (i = 0; i < npages; i++) {
 
@@ -1320,19 +1363,15 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
                         * lnb->lnb_rc == 0, so it's easy to detect later. */
                        break;
 
-               if (isize < lnb[i].lnb_file_offset + lnb[i].lnb_len)
-                       lnb[i].lnb_rc = isize - lnb[i].lnb_file_offset;
-               else
-                       lnb[i].lnb_rc = lnb[i].lnb_len;
+               /* instead of looking if we go beyong isize, send complete
+                * pages all the time
+                */
+               lnb[i].lnb_rc = lnb[i].lnb_len;
 
                /* Bypass disk read if fail_loc is set properly */
                if (OBD_FAIL_CHECK(OBD_FAIL_OST_FAKE_RW))
                        SetPageUptodate(lnb[i].lnb_page);
 
-               if (cache == 0)
-                       generic_error_remove_page(inode->i_mapping,
-                                                 lnb[i].lnb_page);
-
                if (PageUptodate(lnb[i].lnb_page)) {
                        cache_hits++;
                        unlock_page(lnb[i].lnb_page);
@@ -1642,9 +1681,11 @@ static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
        return 0;
 }
 
-int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
-                            int write_NUL, loff_t *offs, handle_t *handle)
+static int osd_ldiskfs_write_record(struct dt_object *dt, void *buf,
+                                   int bufsize, int write_NUL, loff_t *offs,
+                                   handle_t *handle)
 {
+       struct inode *inode = osd_dt_obj(dt)->oo_inode;
         struct buffer_head *bh        = NULL;
         loff_t              offset    = *offs;
         loff_t              new_size  = i_size_read(inode);
@@ -1654,6 +1695,8 @@ int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
         int                 size;
         int                 boffs;
         int                 dirty_inode = 0;
+       struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
+       bool create, sparse, sync = false;
 
        if (write_NUL) {
                /*
@@ -1665,8 +1708,14 @@ int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
                ++bufsize;
        }
 
+       /* sparse checking is racy, but sparse is very rare case, leave as is */
+       sparse = (new_size > 0 && (inode->i_blocks >> (inode->i_blkbits - 9)) <
+                 ((new_size - 1) >> inode->i_blkbits) + 1);
+
        while (bufsize > 0) {
                int credits = handle->h_buffer_credits;
+               unsigned long last_block = (new_size == 0) ? 0 :
+                                          (new_size - 1) >> inode->i_blkbits;
 
                if (bh)
                        brelse(bh);
@@ -1674,7 +1723,39 @@ int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
                block = offset >> inode->i_blkbits;
                boffs = offset & (blocksize - 1);
                size = min(blocksize - boffs, bufsize);
-               bh = __ldiskfs_bread(handle, inode, block, 1);
+               sync = (block > last_block || new_size == 0 || sparse);
+
+               if (sync)
+                       down(&ei->i_append_sem);
+
+               bh = __ldiskfs_bread(handle, inode, block, 0);
+
+               if (unlikely(IS_ERR_OR_NULL(bh) && !sync))
+                       CWARN("%s: adding bh without locking off %llu (block %lu, "
+                             "size %d, offs %llu)\n", inode->i_sb->s_id,
+                             offset, block, bufsize, *offs);
+
+               if (IS_ERR_OR_NULL(bh)) {
+                       struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
+                       int flags = LDISKFS_GET_BLOCKS_CREATE;
+
+                       /* while the file system is being mounted, avoid
+                        * preallocation otherwise mount can take a long
+                        * time as mballoc cache is cold.
+                        * XXX: this is a workaround until we have a proper
+                        *      fix in mballoc
+                        * XXX: works with extent-based files only */
+                       if (!osd->od_cl_seq)
+                               flags |= LDISKFS_GET_BLOCKS_NO_NORMALIZE;
+                       bh = __ldiskfs_bread(handle, inode, block, flags);
+                       create = true;
+               } else {
+                       if (sync) {
+                               up(&ei->i_append_sem);
+                               sync = false;
+                       }
+                       create = false;
+               }
                if (IS_ERR_OR_NULL(bh)) {
                        if (bh == NULL) {
                                err = -EIO;
@@ -1699,7 +1780,14 @@ int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
                LASSERTF(boffs + size <= bh->b_size,
                         "boffs %d size %d bh->b_size %lu\n",
                         boffs, size, (unsigned long)bh->b_size);
-                memcpy(bh->b_data + boffs, buf, size);
+               if (create) {
+                       memset(bh->b_data, 0, bh->b_size);
+                       if (sync) {
+                               up(&ei->i_append_sem);
+                               sync = false;
+                       }
+               }
+               memcpy(bh->b_data + boffs, buf, size);
                err = ldiskfs_handle_dirty_metadata(handle, NULL, bh);
                 if (err)
                         break;
@@ -1710,8 +1798,11 @@ int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
                 bufsize -= size;
                 buf += size;
         }
-        if (bh)
-                brelse(bh);
+       if (sync)
+               up(&ei->i_append_sem);
+
+       if (bh)
+               brelse(bh);
 
        if (write_NUL)
                --new_size;
@@ -1765,9 +1856,8 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
        if (is_link && (buf->lb_len < sizeof(LDISKFS_I(inode)->i_data)))
                result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
        else
-               result = osd_ldiskfs_write_record(inode, buf->lb_buf,
-                                                 buf->lb_len, is_link, pos,
-                                                 oh->ot_handle);
+               result = osd_ldiskfs_write_record(dt, buf->lb_buf, buf->lb_len,
+                                                 is_link, pos, oh->ot_handle);
        if (result == 0)
                result = buf->lb_len;
 
@@ -1955,17 +2045,16 @@ static int osd_fiemap_get(const struct lu_env *env, struct dt_object *dt,
 static int osd_ladvise(const struct lu_env *env, struct dt_object *dt,
                       __u64 start, __u64 end, enum lu_ladvise_type advice)
 {
-       int              rc = 0;
-       struct inode    *inode = osd_dt_obj(dt)->oo_inode;
+       struct osd_object *obj = osd_dt_obj(dt);
+       int rc = 0;
        ENTRY;
 
        switch (advice) {
        case LU_LADVISE_DONTNEED:
-               if (end == 0)
-                       break;
-               invalidate_mapping_pages(inode->i_mapping,
-                                        start >> PAGE_SHIFT,
-                                        (end - 1) >> PAGE_SHIFT);
+               if (end)
+                       invalidate_mapping_pages(obj->oo_inode->i_mapping,
+                                                start >> PAGE_SHIFT,
+                                                (end - 1) >> PAGE_SHIFT);
                break;
        default:
                rc = -ENOTSUPP;
@@ -2077,7 +2166,9 @@ void osd_execute_truncate(struct osd_object *obj)
                return;
        }
 
+       inode_lock(inode);
        ldiskfs_truncate(inode);
+       inode_unlock(inode);
 
        /*
         * For a partial-page truncate, flush the page to disk immediately to