sub_io->ci_lockreq = io->ci_lockreq;
sub_io->ci_type = io->ci_type;
sub_io->ci_no_srvlock = io->ci_no_srvlock;
+ sub_io->ci_noatime = io->ci_noatime;
lov_sub_enter(sub);
result = cl_io_sub_init(sub->sub_env, sub_io,
*
*/
-static int lov_page_stripe(const struct cl_page *page)
+int lov_page_stripe(const struct cl_page *page)
{
- struct lovsub_object *subobj;
+ struct lovsub_object *subobj;
+ const struct cl_page_slice *slice;
+ ENTRY;
- ENTRY;
- subobj = lu2lovsub(
- lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
- &lovsub_device_type));
- LASSERT(subobj != NULL);
- RETURN(subobj->lso_index);
+ slice = cl_page_at(page, &lovsub_device_type);
+ LASSERT(slice != NULL);
+ LASSERT(slice->cpl_obj != NULL);
+
+ subobj = cl2lovsub(slice->cpl_obj);
+ RETURN(subobj->lso_index);
}
struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
struct cl_io *io)
{
- struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ struct lov_stripe_md *lsm;
int result;
+ ENTRY;
- LASSERT(lio->lis_object != NULL);
- ENTRY;
+ LASSERT(lio->lis_object != NULL);
+ lsm = lio->lis_object->lo_lsm;
/*
* Need to be optimized, we can't afford to allocate a piece of memory
LASSERT(cfs_atomic_read(&lov->lo_active_ios) > 0);
if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
- cfs_waitq_broadcast(&lov->lo_waitq);
+ wake_up_all(&lov->lo_waitq);
EXIT;
}
EXIT;
}
-
-static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
- struct cl_page_list *qin,
- int idx, int alloc)
-{
- return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
-}
-
/**
* lov implementation of cl_operations::cio_submit() method. It takes a list
* of pages in \a queue, splits it into per-stripe sub-lists, invokes
* lov_device::ld_mutex mutex.
*/
static int lov_io_submit(const struct lu_env *env,
- const struct cl_io_slice *ios,
+ const struct cl_io_slice *ios,
enum cl_req_type crt, struct cl_2queue *queue)
{
- struct lov_io *lio = cl2lov_io(env, ios);
- struct lov_object *obj = lio->lis_object;
- struct lov_device *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
- struct cl_page_list *qin = &queue->c2_qin;
- struct cl_2queue *cl2q = &lov_env_info(env)->lti_cl2q;
- struct cl_page_list *stripes_qin = NULL;
- struct cl_page *page;
- struct cl_page *tmp;
- int stripe;
-
-#define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
+ struct cl_page_list *qin = &queue->c2_qin;
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct lov_io_sub *sub;
+ struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
+ struct cl_page *page;
+ int stripe;
+ int rc = 0;
+ ENTRY;
- int rc = 0;
- int alloc =
-#if defined(__KERNEL__) && defined(__linux__)
- !(current->flags & PF_MEMALLOC);
-#else
- 1;
-#endif
- ENTRY;
if (lio->lis_active_subios == 1) {
int idx = lio->lis_single_subio_index;
- struct lov_io_sub *sub;
LASSERT(idx < lio->lis_nr_subios);
sub = lov_sub_get(env, lio, idx);
}
LASSERT(lio->lis_subs != NULL);
- if (alloc) {
- OBD_ALLOC_LARGE(stripes_qin,
- sizeof(*stripes_qin) * lio->lis_nr_subios);
- if (stripes_qin == NULL)
- RETURN(-ENOMEM);
-
- for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
- cl_page_list_init(&stripes_qin[stripe]);
- } else {
- /*
- * If we get here, it means pageout & swap doesn't help.
- * In order to not make things worse, even don't try to
- * allocate the memory with __GFP_NOWARN. -jay
- */
- mutex_lock(&ld->ld_mutex);
- lio->lis_mem_frozen = 1;
- }
- cl_2queue_init(cl2q);
- cl_page_list_for_each_safe(page, tmp, qin) {
- stripe = lov_page_stripe(page);
- cl_page_list_move(QIN(stripe), qin, page);
- }
+ cl_page_list_init(plist);
+ while (qin->pl_nr > 0) {
+ struct cl_2queue *cl2q = &lov_env_info(env)->lti_cl2q;
- for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
- struct lov_io_sub *sub;
- struct cl_page_list *sub_qin = QIN(stripe);
+ cl_2queue_init(cl2q);
- if (cfs_list_empty(&sub_qin->pl_pages))
- continue;
+ page = cl_page_list_first(qin);
+ cl_page_list_move(&cl2q->c2_qin, qin, page);
- cl_page_list_splice(sub_qin, &cl2q->c2_qin);
- sub = lov_sub_get(env, lio, stripe);
- if (!IS_ERR(sub)) {
+ stripe = lov_page_stripe(page);
+ while (qin->pl_nr > 0) {
+ page = cl_page_list_first(qin);
+ if (stripe != lov_page_stripe(page))
+ break;
+
+ cl_page_list_move(&cl2q->c2_qin, qin, page);
+ }
+
+ sub = lov_sub_get(env, lio, stripe);
+ if (!IS_ERR(sub)) {
rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
crt, cl2q);
- lov_sub_put(sub);
- } else
- rc = PTR_ERR(sub);
- cl_page_list_splice(&cl2q->c2_qin, &queue->c2_qin);
- cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
- if (rc != 0)
- break;
- }
+ lov_sub_put(sub);
+ } else {
+ rc = PTR_ERR(sub);
+ }
- for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
- struct cl_page_list *sub_qin = QIN(stripe);
+ cl_page_list_splice(&cl2q->c2_qin, plist);
+ cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
+ cl_2queue_fini(env, cl2q);
- if (cfs_list_empty(&sub_qin->pl_pages))
- continue;
+ if (rc != 0)
+ break;
+ }
- cl_page_list_splice(sub_qin, qin);
- }
+ cl_page_list_splice(plist, qin);
+ cl_page_list_fini(env, plist);
- if (alloc) {
- OBD_FREE_LARGE(stripes_qin,
- sizeof(*stripes_qin) * lio->lis_nr_subios);
- } else {
- int i;
+ RETURN(rc);
+}
- for (i = 0; i < lio->lis_nr_subios; i++) {
- struct cl_io *cio = lio->lis_subs[i].sub_io;
+static int lov_io_commit_async(const struct lu_env *env,
+ const struct cl_io_slice *ios,
+ struct cl_page_list *queue, int from, int to,
+ cl_commit_cbt cb)
+{
+ struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct lov_io_sub *sub;
+ struct cl_page *page;
+ int rc = 0;
+ ENTRY;
- if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
- lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
- }
- lio->lis_mem_frozen = 0;
- mutex_unlock(&ld->ld_mutex);
- }
+ if (lio->lis_active_subios == 1) {
+ int idx = lio->lis_single_subio_index;
+
+ LASSERT(idx < lio->lis_nr_subios);
+ sub = lov_sub_get(env, lio, idx);
+ LASSERT(!IS_ERR(sub));
+ LASSERT(sub->sub_io == &lio->lis_single_subio);
+ rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
+ from, to, cb);
+ lov_sub_put(sub);
+ RETURN(rc);
+ }
- RETURN(rc);
-#undef QIN
-}
+ LASSERT(lio->lis_subs != NULL);
-static int lov_io_prepare_write(const struct lu_env *env,
- const struct cl_io_slice *ios,
- const struct cl_page_slice *slice,
- unsigned from, unsigned to)
-{
- struct lov_io *lio = cl2lov_io(env, ios);
- struct cl_page *sub_page = lov_sub_page(slice);
- struct lov_io_sub *sub;
- int result;
+ cl_page_list_init(plist);
+ while (queue->pl_nr > 0) {
+ int stripe_to = to;
+ int stripe;
- ENTRY;
- sub = lov_page_subio(env, lio, slice);
- if (!IS_ERR(sub)) {
- result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
- sub_page, from, to);
- lov_sub_put(sub);
- } else
- result = PTR_ERR(sub);
- RETURN(result);
-}
+ LASSERT(plist->pl_nr == 0);
+ page = cl_page_list_first(queue);
+ cl_page_list_move(plist, queue, page);
-static int lov_io_commit_write(const struct lu_env *env,
- const struct cl_io_slice *ios,
- const struct cl_page_slice *slice,
- unsigned from, unsigned to)
-{
- struct lov_io *lio = cl2lov_io(env, ios);
- struct cl_page *sub_page = lov_sub_page(slice);
- struct lov_io_sub *sub;
- int result;
+ stripe = lov_page_stripe(page);
+ while (queue->pl_nr > 0) {
+ page = cl_page_list_first(queue);
+ if (stripe != lov_page_stripe(page))
+ break;
- ENTRY;
- sub = lov_page_subio(env, lio, slice);
- if (!IS_ERR(sub)) {
- result = cl_io_commit_write(sub->sub_env, sub->sub_io,
- sub_page, from, to);
- lov_sub_put(sub);
- } else
- result = PTR_ERR(sub);
- RETURN(result);
+ cl_page_list_move(plist, queue, page);
+ }
+
+ if (queue->pl_nr > 0) /* still has more pages */
+ stripe_to = PAGE_SIZE;
+
+ sub = lov_sub_get(env, lio, stripe);
+ if (!IS_ERR(sub)) {
+ rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
+ plist, from, stripe_to, cb);
+ lov_sub_put(sub);
+ } else {
+ rc = PTR_ERR(sub);
+ break;
+ }
+
+ if (plist->pl_nr > 0) /* short write */
+ break;
+
+ from = 0;
+ }
+
+ /* for error case, add the page back into the qin list */
+ LASSERT(ergo(rc == 0, plist->pl_nr == 0));
+ while (plist->pl_nr > 0) {
+ /* error occurred, add the uncommitted pages back into queue */
+ page = cl_page_list_last(plist);
+ cl_page_list_move_head(queue, plist, page);
+ }
+
+ RETURN(rc);
}
static int lov_io_fault_start(const struct lu_env *env,
.cio_start = lov_io_start,
.cio_end = lov_io_fsync_end
},
- [CIT_MISC] = {
- .cio_fini = lov_io_fini
- }
- },
- .req_op = {
- [CRT_READ] = {
- .cio_submit = lov_io_submit
- },
- [CRT_WRITE] = {
- .cio_submit = lov_io_submit
- }
- },
- .cio_prepare_write = lov_io_prepare_write,
- .cio_commit_write = lov_io_commit_write
+ [CIT_MISC] = {
+ .cio_fini = lov_io_fini
+ }
+ },
+ .cio_submit = lov_io_submit,
+ .cio_commit_async = lov_io_commit_async,
};
/*****************************************************************************
const struct cl_io_slice *ios)
{
struct lov_object *lov = cl2lov(ios->cis_obj);
- ENTRY;
+ ENTRY;
if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
- cfs_waitq_broadcast(&lov->lo_waitq);
- EXIT;
+ wake_up_all(&lov->lo_waitq);
+ EXIT;
}
static void lov_empty_impossible(const struct lu_env *env,
.cio_end = LOV_EMPTY_IMPOSSIBLE
},
[CIT_FSYNC] = {
- .cio_fini = lov_empty_io_fini
+ .cio_fini = lov_empty_io_fini
},
- [CIT_MISC] = {
- .cio_fini = lov_empty_io_fini
- }
- },
- .req_op = {
- [CRT_READ] = {
- .cio_submit = LOV_EMPTY_IMPOSSIBLE
- },
- [CRT_WRITE] = {
- .cio_submit = LOV_EMPTY_IMPOSSIBLE
- }
- },
- .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
+ [CIT_MISC] = {
+ .cio_fini = lov_empty_io_fini
+ }
+ },
+ .cio_submit = LOV_EMPTY_IMPOSSIBLE,
+ .cio_commit_async = LOV_EMPTY_IMPOSSIBLE
};
int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
RETURN(result != 0);
}
+int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
+ struct cl_io *io)
+{
+ struct lov_object *lov = cl2lov(obj);
+ struct lov_io *lio = lov_env_io(env);
+ int result;
+ ENTRY;
+
+ LASSERT(lov->lo_lsm != NULL);
+ lio->lis_object = lov;
+
+ switch (io->ci_type) {
+ default:
+ LASSERTF(0, "invalid type %d\n", io->ci_type);
+ case CIT_MISC:
+ case CIT_FSYNC:
+ result = 1;
+ break;
+ case CIT_SETATTR:
+ /* the truncate to 0 is managed by MDT:
+ * - in open, for open O_TRUNC
+ * - in setattr, for truncate
+ */
+ /* the truncate is for size > 0 so triggers a restore */
+ if (cl_io_is_trunc(io))
+ io->ci_restore_needed = 1;
+ result = -ENODATA;
+ break;
+ case CIT_READ:
+ case CIT_WRITE:
+ case CIT_FAULT:
+ io->ci_restore_needed = 1;
+ result = -ENODATA;
+ break;
+ }
+ if (result == 0) {
+ cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
+ cfs_atomic_inc(&lov->lo_active_ios);
+ }
+
+ io->ci_result = result < 0 ? result : 0;
+ RETURN(result != 0);
+}
/** @} lov */