cfs_waitq_t set_waitq;
cfs_waitq_t *set_wakeup_ptr;
struct list_head set_requests;
+ struct list_head set_cblist; /* list of completion callbacks */
set_interpreter_func set_interpret; /* completion callback */
void *set_arg; /* completion context */
- void *set_countp; /* pointer to NOB counter in case
- * of directIO (bug11737) */
/* locked so that any old caller can communicate requests to
* the set holder who can then fold them into the lock-free set */
spinlock_t set_new_req_lock;
struct list_head set_new_requests;
};
+struct ptlrpc_set_cbdata {
+ struct list_head psc_item;
+ set_interpreter_func psc_interpret;
+ void *psc_data;
+};
+
struct ptlrpc_bulk_desc;
/*
void ptlrpc_abort_inflight(struct obd_import *imp);
struct ptlrpc_request_set *ptlrpc_prep_set(void);
+int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
+ set_interpreter_func fn, void *data);
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
int ptlrpc_check_set(struct ptlrpc_request_set *set);
int ptlrpc_set_wait(struct ptlrpc_request_set *);
{
struct ptlrpc_request_set *set = NULL;
struct obd_info oinfo = { { { 0 } } };
- atomic_t nob;
int rc = 0;
ENTRY;
set = ptlrpc_prep_set();
if (set == NULL)
RETURN(-ENOMEM);
- atomic_set(&nob, 0);
- set->set_countp = &nob;
oinfo.oi_oa = oa;
oinfo.oi_md = lsm;
rc = ptlrpc_set_wait(set);
if (rc)
CERROR("error from callback: rc = %d\n", rc);
- else
- rc = atomic_read(&nob);
} else {
CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
"error from obd_brw_async: rc = %d\n", rc);
static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode,
struct address_space *mapping,
- struct lov_stripe_md *lsm,
+ struct obd_info *oinfo,
+ struct ptlrpc_request_set *set,
size_t size, loff_t file_offset,
struct page **pages, int page_count)
{
struct brw_page *pga;
- struct obdo oa;
- int opc, i, rc = 0;
+ int i, rc = 0;
size_t length;
- struct obd_capa *ocapa;
- loff_t file_offset_orig = file_offset;
ENTRY;
OBD_ALLOC(pga, sizeof(*pga) * page_count);
POISON_PAGE(pages[i], 0x0d);
}
- ll_inode_fill_obdo(inode, rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, &oa);
-
- if (rw == WRITE) {
- lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
- LPROC_LL_DIRECT_WRITE, size);
- opc = CAPA_OPC_OSS_WRITE;
- llap_write_pending(inode, NULL);
- } else {
- lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
- LPROC_LL_DIRECT_READ, size);
- opc = CAPA_OPC_OSS_RW;
- }
- ocapa = ll_osscapa_get(inode, opc);
- rc = obd_brw_rqset(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
- ll_i2dtexp(inode), &oa, lsm, page_count, pga, NULL,
- ocapa);
- capa_put(ocapa);
- if ((rc > 0) && (rw == WRITE)) {
- lov_stripe_lock(lsm);
- obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset_orig + rc, 0);
- lov_stripe_unlock(lsm);
- }
+ rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
+ ll_i2dtexp(inode), oinfo, page_count,
+ pga, NULL, set);
+ if (rc == 0)
+ rc = size;
OBD_FREE(pga, sizeof(*pga) * page_count);
RETURN(rc);
struct inode *inode = file->f_mapping->host;
ssize_t count = iov_length(iov, nr_segs), tot_bytes = 0;
struct ll_inode_info *lli = ll_i2info(inode);
+ struct lov_stripe_md *lsm = lli->lli_smd;
+ struct ptlrpc_request_set *set;
+ struct obd_info oinfo;
+ struct obdo oa;
unsigned long seg = 0;
size_t size = MAX_DIO_SIZE;
+ int opc;
ENTRY;
if (!lli->lli_smd || !lli->lli_smd->lsm_object_id)
file_offset, file_offset, count >> CFS_PAGE_SHIFT,
MAX_DIO_SIZE >> CFS_PAGE_SHIFT);
- if (rw == WRITE)
+ if (rw == WRITE) {
ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRECT_WRITE, count);
- else
+ opc = CAPA_OPC_OSS_WRITE;
+ llap_write_pending(inode, NULL);
+ } else {
ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRECT_READ, count);
+ opc = CAPA_OPC_OSS_RW;
+ }
/* Check that all user buffers are aligned as well */
for (seg = 0; seg < nr_segs; seg++) {
RETURN(-EINVAL);
}
+ set = ptlrpc_prep_set();
+ if (set == NULL)
+ RETURN(-ENOMEM);
+
+ ll_inode_fill_obdo(inode, rw, &oa);
+ oinfo.oi_oa = &oa;
+ oinfo.oi_md = lsm;
+ oinfo.oi_capa = ll_osscapa_get(inode, opc);
+
+ /* need locking between buffered and direct access. and race with
+ *size changing by concurrent truncates and writes. */
+ if (rw == READ)
+ LOCK_INODE_MUTEX(inode);
+
for (seg = 0; seg < nr_segs; seg++) {
size_t iov_left = iov[seg].iov_len;
unsigned long user_addr = (unsigned long)iov[seg].iov_base;
+ if (rw == READ) {
+ if (file_offset >= inode->i_size)
+ break;
+ if (file_offset + iov_left > inode->i_size)
+ iov_left = inode->i_size - file_offset;
+ }
+
while (iov_left > 0) {
struct page **pages;
int page_count;
if (page_count > 0) {
result = ll_direct_IO_26_seg(rw, inode,
file->f_mapping,
- lli->lli_smd,
+ &oinfo, set,
min(size,iov_left),
file_offset, pages,
page_count);
continue;
}
- if (tot_bytes > 0)
- RETURN(tot_bytes);
- RETURN(page_count < 0 ? page_count : result);
+ if (tot_bytes <= 0)
+ tot_bytes = page_count < 0 ? page_count : result;
+ GOTO(out, tot_bytes);
}
tot_bytes += result;
user_addr += result;
}
}
+out:
+ if (rw == READ)
+ UNLOCK_INODE_MUTEX(inode);
+
+ if (tot_bytes > 0) {
+ int rc;
+
+ rc = ptlrpc_set_wait(set);
+ if (rc) {
+ tot_bytes = rc;
+ } else if (rw == WRITE) {
+ lov_stripe_lock(lsm);
+ obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0);
+ lov_stripe_unlock(lsm);
+ }
+ }
+
+ capa_put(oinfo.oi_capa);
+ ptlrpc_set_destroy(set);
RETURN(tot_bytes);
}
}
LASSERT(rc == 0);
LASSERT(set->set_interpret == NULL);
- set->set_interpret = (set_interpreter_func)lov_brw_interpret;
- set->set_arg = (void *)lovset;
+ LASSERT(set->set_arg == NULL);
+ rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset);
+ if (rc)
+ GOTO(out, rc);
RETURN(rc);
out:
{
struct osc_brw_async_args *aa = data;
int i;
- int nob = rc;
ENTRY;
rc = osc_brw_fini_request(req, rc);
if (rc == 0)
RETURN(0);
}
- if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
- atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
set->set_remaining = 0;
spin_lock_init(&set->set_new_req_lock);
CFS_INIT_LIST_HEAD(&set->set_new_requests);
-
+ CFS_INIT_LIST_HEAD(&set->set_cblist);
+
RETURN(set);
}
EXIT;
}
+int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
+ set_interpreter_func fn, void *data)
+{
+ struct ptlrpc_set_cbdata *cbdata;
+
+ OBD_ALLOC_PTR(cbdata);
+ if (cbdata == NULL)
+ RETURN(-ENOMEM);
+
+ cbdata->psc_interpret = fn;
+ cbdata->psc_data = data;
+ list_add_tail(&cbdata->psc_item, &set->set_cblist);
+
+ RETURN(0);
+}
+
void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
struct ptlrpc_request *req)
{
int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
set->set_interpret;
rc = interpreter (set, set->set_arg, rc);
+ } else {
+ struct ptlrpc_set_cbdata *cbdata, *n;
+ int err;
+
+ list_for_each_entry_safe(cbdata, n,
+ &set->set_cblist, psc_item) {
+ list_del_init(&cbdata->psc_item);
+ err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
+ if (err && !rc)
+ rc = err;
+ OBD_FREE_PTR(cbdata);
+ }
}
RETURN(rc);
EXPORT_SYMBOL(ptlrpc_next_xid);
EXPORT_SYMBOL(ptlrpc_prep_set);
+EXPORT_SYMBOL(ptlrpc_set_add_cb);
EXPORT_SYMBOL(ptlrpc_set_add_req);
EXPORT_SYMBOL(ptlrpc_set_add_new_req);
EXPORT_SYMBOL(ptlrpc_set_destroy);
long len;
off64_t seek;
struct stat64 st;
+ char pad = 0xba;
int action;
int rc;
if (argc < 5 || argc > 6) {
- printf("Usage: %s <read/write/rdwr> file seek nr_blocks [blocksize]\n", argv[0]);
+ printf("Usage: %s <read/write/rdwr/readhole> file seek nr_blocks [blocksize]\n", argv[0]);
return 1;
}
action = O_WRONLY;
else if (!strcmp(argv[1], "rdwr"))
action = O_RDWR;
- else {
+ else if (!strcmp(argv[1], "readhole")) {
+ action = O_RDONLY;
+ pad = 0;
+ } else {
printf("Usage: %s <read/write/rdwr> file seek nr_blocks [blocksize]\n", argv[0]);
return 1;
}
printf("No memory %s\n", strerror(errno));
return 1;
}
- memset(wbuf, 0xba, len);
+ memset(wbuf, pad, len);
if (action == O_WRONLY || action == O_RDWR) {
if (lseek64(fd, seek, SEEK_SET) < 0) {
sync
multiop $DIR/$tfile oO_RDONLY:O_DIRECT:r$((2048 * 1024)) || \
error "direct read failed"
+ rm -f $DIR/$tfile
}
run_test 119b "Sparse directIO read must return actual read amount"
+test_119c() # bug 13099
+{
+ BSIZE=1048576
+ directio write $DIR/$tfile 3 1 $BSIZE || error "direct write failed"
+ directio readhole $DIR/$tfile 0 2 $BSIZE || error "reading hole failed"
+ rm -f $DIR/$tfile
+}
+run_test 119c "Testing for direct read hitting hole"
+
LDLM_POOL_CTL_RECALC=1
LDLM_POOL_CTL_SHRINK=2