- struct inode *inode = lo->lo_backing_file->f_dentry->d_inode;
- struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
- struct obd_info oinfo = {{{ 0 }}};
- struct brw_page *pg = lo->lo_requests[0].lrd_pages;
- struct obdo *oa = &lo->lo_requests[0].lrd_oa;
- pgoff_t offset;
- int ret, cmd, i, opc;
- struct bio_vec *bvec;
-
- BUG_ON(bio->bi_hw_segments > LLOOP_MAX_SEGMENTS);
-
- offset = (pgoff_t)(bio->bi_sector << 9) + lo->lo_offset;
- bio_for_each_segment(bvec, bio, i) {
- BUG_ON(bvec->bv_offset != 0);
- BUG_ON(bvec->bv_len != CFS_PAGE_SIZE);
-
- pg->pg = bvec->bv_page;
- pg->off = offset;
- pg->count = bvec->bv_len;
- pg->flag = OBD_BRW_SRVLOCK;
-
- pg++;
- offset += bvec->bv_len;
- }
-
- oa->o_mode = inode->i_mode;
- oa->o_id = lsm->lsm_object_id;
- oa->o_gr = lsm->lsm_object_gr;
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLMODE |
- OBD_MD_FLTYPE |OBD_MD_FLGROUP;
- obdo_from_inode(oa, inode, OBD_MD_FLFID | OBD_MD_FLGENER);
-
- cmd = OBD_BRW_READ;
- if (bio_rw(bio) == WRITE)
- cmd = OBD_BRW_WRITE;
-
- if (cmd == OBD_BRW_WRITE)
- ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_WRITE, bio->bi_size);
- else
- ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_READ, bio->bi_size);
- oinfo.oi_oa = oa;
- oinfo.oi_md = lsm;
- opc = cmd & OBD_BRW_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
- oinfo.oi_capa = ll_osscapa_get(inode, opc);
- ret = obd_brw(cmd, ll_i2dtexp(inode), &oinfo,
- (obd_count)(i - bio->bi_idx),
- lo->lo_requests[0].lrd_pages, NULL);
- capa_put(oinfo.oi_capa);
- if (ret == 0)
- obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS);
- return ret;
+ const struct lu_env *env = lo->lo_env;
+ struct cl_io *io = &lo->lo_io;
+ struct inode *inode = lo->lo_backing_file->f_dentry->d_inode;
+ struct cl_object *obj = ll_i2info(inode)->lli_clob;
+ pgoff_t offset;
+ int ret;
+#ifdef HAVE_BVEC_ITER
+ struct bvec_iter iter;
+ struct bio_vec bvec;
+#else
+ int iter;
+ struct bio_vec *bvec;
+#endif
+ int rw;
+ size_t page_count = 0;
+ struct bio *bio;
+ ssize_t bytes;
+
+ struct ll_dio_pages *pvec = &lo->lo_pvec;
+ struct page **pages = pvec->ldp_pages;
+ loff_t *offsets = pvec->ldp_offsets;
+
+ truncate_inode_pages(inode->i_mapping, 0);
+
+ /* initialize the IO */
+ memset(io, 0, sizeof(*io));
+ io->ci_obj = obj;
+ ret = cl_io_init(env, io, CIT_MISC, obj);
+ if (ret)
+ return io->ci_result;
+ io->ci_lockreq = CILR_NEVER;
+
+ LASSERT(head != NULL);
+ rw = head->bi_rw;
+ for (bio = head; bio != NULL; bio = bio->bi_next) {
+ LASSERT(rw == bio->bi_rw);
+
+#ifdef HAVE_BVEC_ITER
+ offset = (pgoff_t)(bio->bi_iter.bi_sector << 9) + lo->lo_offset;
+ bio_for_each_segment(bvec, bio, iter) {
+ BUG_ON(bvec.bv_offset != 0);
+ BUG_ON(bvec.bv_len != PAGE_CACHE_SIZE);
+
+ pages[page_count] = bvec.bv_page;
+ offsets[page_count] = offset;
+ page_count++;
+ offset += bvec.bv_len;
+#else
+ offset = (pgoff_t)(bio->bi_sector << 9) + lo->lo_offset;
+ bio_for_each_segment(bvec, bio, iter) {
+ BUG_ON(bvec->bv_offset != 0);
+ BUG_ON(bvec->bv_len != PAGE_CACHE_SIZE);
+
+ pages[page_count] = bvec->bv_page;
+ offsets[page_count] = offset;
+ page_count++;
+ offset += bvec->bv_len;
+#endif
+ }
+ LASSERT(page_count <= LLOOP_MAX_SEGMENTS);
+ }
+
+ ll_stats_ops_tally(ll_i2sbi(inode),
+ (rw == WRITE) ? LPROC_LL_BRW_WRITE : LPROC_LL_BRW_READ,
+ page_count);
+
+ pvec->ldp_size = page_count << PAGE_CACHE_SHIFT;
+ pvec->ldp_nr = page_count;
+
+ /* FIXME: in ll_direct_rw_pages, it has to allocate many cl_page{}s to
+ * write those pages into OST. Even worse case is that more pages
+ * would be asked to write out to swap space, and then finally get here
+ * again.
+ * Unfortunately this is NOT easy to fix.
+ * Thoughts on solution:
+ * 0. Define a reserved pool for cl_pages, which could be a list of
+ * pre-allocated cl_pages;
+ * 1. Define a new operation in cl_object_operations{}, says clo_depth,
+ * which measures how many layers for this lustre object. Generally
+ * speaking, the depth would be 2, one for llite, and one for lovsub.
+ * However, for SNS, there will be more since we need additional page
+ * to store parity;
+ * 2. Reserve the # of (page_count * depth) cl_pages from the reserved
+ * pool. Afterwards, the clio would allocate the pages from reserved
+ * pool, this guarantees we neeedn't allocate the cl_pages from
+ * generic cl_page slab cache.
+ * Of course, if there is NOT enough pages in the pool, we might
+ * be asked to write less pages once, this purely depends on
+ * implementation. Anyway, we should be careful to avoid deadlocking.
+ */
+ mutex_lock(&inode->i_mutex);
+ bytes = ll_direct_rw_pages(env, io, rw, inode, pvec);
+ mutex_unlock(&inode->i_mutex);
+ cl_io_fini(env, io);
+ return (bytes == pvec->ldp_size) ? 0 : (int)bytes;