*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* @{
*/
-static inline void lov_sub_enter(struct lov_io_sub *sub)
-{
- sub->sub_reenter++;
-}
-static inline void lov_sub_exit(struct lov_io_sub *sub)
-{
- sub->sub_reenter--;
-}
-
static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
struct lov_io_sub *sub)
{
ENTRY;
if (sub->sub_io != NULL) {
- if (sub->sub_io_initialized) {
- lov_sub_enter(sub);
- cl_io_fini(sub->sub_env, sub->sub_io);
- lov_sub_exit(sub);
- sub->sub_io_initialized = 0;
- lio->lis_active_subios--;
- }
+ if (sub->sub_io_initialized) {
+ cl_io_fini(sub->sub_env, sub->sub_io);
+ sub->sub_io_initialized = 0;
+ lio->lis_active_subios--;
+ }
if (sub->sub_stripe == lio->lis_single_subio_index)
lio->lis_single_subio_index = -1;
else if (!sub->sub_borrowed)
case CIT_READ:
case CIT_WRITE: {
io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
- if (cl_io_is_append(parent)) {
- io->u.ci_wr.wr_append = 1;
- } else {
- io->u.ci_rw.crw_pos = start;
- io->u.ci_rw.crw_count = end - start;
- }
- break;
- }
- default:
- break;
- }
+ if (cl_io_is_append(parent)) {
+ io->u.ci_wr.wr_append = 1;
+ } else {
+ io->u.ci_rw.crw_pos = start;
+ io->u.ci_rw.crw_count = end - start;
+ }
+ break;
+ }
+ case CIT_LADVISE: {
+ io->u.ci_ladvise.li_start = start;
+ io->u.ci_ladvise.li_end = end;
+ io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid;
+ io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice;
+ io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
+ break;
+ }
+ default:
+ break;
+ }
}
static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
struct lov_io_sub *sub)
{
- struct lov_object *lov = lio->lis_object;
- struct lov_device *ld = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
- struct cl_io *sub_io;
- struct cl_object *sub_obj;
- struct cl_io *io = lio->lis_cl.cis_io;
-
- int stripe = sub->sub_stripe;
- int result;
+ struct lov_object *lov = lio->lis_object;
+ struct cl_io *sub_io;
+ struct cl_object *sub_obj;
+ struct cl_io *io = lio->lis_cl.cis_io;
+ int stripe = sub->sub_stripe;
+ int rc;
LASSERT(sub->sub_io == NULL);
LASSERT(sub->sub_env == NULL);
if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
RETURN(-EIO);
- result = 0;
sub->sub_io_initialized = 0;
sub->sub_borrowed = 0;
- if (lio->lis_mem_frozen) {
- LASSERT(mutex_is_locked(&ld->ld_mutex));
- sub->sub_io = &ld->ld_emrg[stripe]->emrg_subio;
- sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
- sub->sub_borrowed = 1;
- } else {
- void *cookie;
-
- /* obtain new environment */
- cookie = cl_env_reenter();
- sub->sub_env = cl_env_get(&sub->sub_refcheck);
- cl_env_reexit(cookie);
- if (IS_ERR(sub->sub_env))
- result = PTR_ERR(sub->sub_env);
-
- if (result == 0) {
- /*
- * First sub-io. Use ->lis_single_subio to
- * avoid dynamic allocation.
- */
- if (lio->lis_active_subios == 0) {
- sub->sub_io = &lio->lis_single_subio;
- lio->lis_single_subio_index = stripe;
- } else {
- OBD_ALLOC_PTR(sub->sub_io);
- if (sub->sub_io == NULL)
- result = -ENOMEM;
- }
- }
- }
+ /* obtain new environment */
+ sub->sub_env = cl_env_get(&sub->sub_refcheck);
+ if (IS_ERR(sub->sub_env))
+ GOTO(fini_lov_io, rc = PTR_ERR(sub->sub_env));
- if (result == 0) {
- sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
- sub_io = sub->sub_io;
-
- sub_io->ci_obj = sub_obj;
- sub_io->ci_result = 0;
-
- sub_io->ci_parent = io;
- sub_io->ci_lockreq = io->ci_lockreq;
- sub_io->ci_type = io->ci_type;
- sub_io->ci_no_srvlock = io->ci_no_srvlock;
- sub_io->ci_noatime = io->ci_noatime;
-
- lov_sub_enter(sub);
- result = cl_io_sub_init(sub->sub_env, sub_io,
- io->ci_type, sub_obj);
- lov_sub_exit(sub);
- if (result >= 0) {
- lio->lis_active_subios++;
- sub->sub_io_initialized = 1;
- result = 0;
- }
- }
- if (result != 0)
- lov_io_sub_fini(env, lio, sub);
- RETURN(result);
+ /*
+ * First sub-io. Use ->lis_single_subio to
+ * avoid dynamic allocation.
+ */
+ if (lio->lis_active_subios == 0) {
+ sub->sub_io = &lio->lis_single_subio;
+ lio->lis_single_subio_index = stripe;
+ } else {
+ OBD_ALLOC_PTR(sub->sub_io);
+ if (sub->sub_io == NULL)
+ GOTO(fini_lov_io, rc = -ENOMEM);
+ }
+
+ sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
+ sub_io = sub->sub_io;
+
+ sub_io->ci_obj = sub_obj;
+ sub_io->ci_result = 0;
+ sub_io->ci_parent = io;
+ sub_io->ci_lockreq = io->ci_lockreq;
+ sub_io->ci_type = io->ci_type;
+ sub_io->ci_no_srvlock = io->ci_no_srvlock;
+ sub_io->ci_noatime = io->ci_noatime;
+
+ rc = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
+ if (rc >= 0) {
+ lio->lis_active_subios++;
+ sub->sub_io_initialized = 1;
+ rc = 0;
+ }
+fini_lov_io:
+ if (rc != 0)
+ lov_io_sub_fini(env, lio, sub);
+ RETURN(rc);
}
struct lov_io_sub *lov_sub_get(const struct lu_env *env,
rc = lov_io_sub_init(env, lio, sub);
} else
rc = 0;
- if (rc == 0)
- lov_sub_enter(sub);
- else
- sub = ERR_PTR(rc);
- RETURN(sub);
-}
-void lov_sub_put(struct lov_io_sub *sub)
-{
- lov_sub_exit(sub);
+ if (rc < 0)
+ sub = ERR_PTR(rc);
+
+ RETURN(sub);
}
/*****************************************************************************
RETURN(cl2lov_page(slice)->lps_stripe);
}
-struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
- const struct cl_page_slice *slice)
-{
- struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
- struct cl_page *page = slice->cpl_page;
- int stripe;
-
- LASSERT(lio->lis_cl.cis_io != NULL);
- LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
- LASSERT(lsm != NULL);
- LASSERT(lio->lis_nr_subios > 0);
- ENTRY;
-
- stripe = lov_page_stripe(page);
- RETURN(lov_sub_get(env, lio, stripe));
-}
-
-
static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
struct cl_io *io)
{
LASSERT(lio->lis_object != NULL);
lsm = lio->lis_object->lo_lsm;
- /*
- * Need to be optimized, we can't afford to allocate a piece of memory
- * when writing a page. -jay
- */
- OBD_ALLOC_LARGE(lio->lis_subs,
- lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
- if (lio->lis_subs != NULL) {
- lio->lis_nr_subios = lio->lis_stripe_count;
- lio->lis_single_subio_index = -1;
- lio->lis_active_subios = 0;
- result = 0;
- } else
- result = -ENOMEM;
- RETURN(result);
+ /*
+ * Need to be optimized, we can't afford to allocate a piece of memory
+ * when writing a page. -jay
+ */
+ OBD_ALLOC_LARGE(lio->lis_subs,
+ lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
+ if (lio->lis_subs != NULL) {
+ lio->lis_nr_subios = lio->lis_stripe_count;
+ lio->lis_single_subio_index = -1;
+ lio->lis_active_subios = 0;
+ result = 0;
+ } else
+ result = -ENOMEM;
+
+ RETURN(result);
}
static int lov_io_slice_init(struct lov_io *lio,
break;
}
+ case CIT_LADVISE: {
+ lio->lis_pos = io->u.ci_ladvise.li_start;
+ lio->lis_endpos = io->u.ci_ladvise.li_end;
+ break;
+ }
+
case CIT_MISC:
lio->lis_pos = 0;
lio->lis_endpos = OBD_OBJECT_EOF;
rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
if (rc != 0)
cl_io_iter_fini(sub->sub_env, sub->sub_io);
- lov_sub_put(sub);
if (rc != 0)
break;
- CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
+ CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
stripe, start, end);
list_add_tail(&sub->sub_linkage, &lio->lis_active);
next) - io->u.ci_rw.crw_pos;
lio->lis_pos = io->u.ci_rw.crw_pos;
lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
- CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
- LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
+ CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) "
+ "%llu\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
(__u64)lio->lis_io_endpos);
}
/*
struct lov_io_sub *sub;
int rc = 0;
- ENTRY;
+ ENTRY;
list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
- lov_sub_enter(sub);
- rc = iofunc(sub->sub_env, sub->sub_io);
- lov_sub_exit(sub);
- if (rc)
- break;
+ rc = iofunc(sub->sub_env, sub->sub_io);
+ if (rc)
+ break;
if (parent->ci_result == 0)
parent->ci_result = sub->sub_io->ci_result;
RETURN(-EIO);
sub = lov_sub_get(env, lio, stripe);
+ if (IS_ERR(sub))
+ return PTR_ERR(sub);
lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
ra);
- lov_sub_put(sub);
CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
if (ra_end != CL_PAGE_EOF)
ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
- pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+ pps = loo->lo_lsm->lsm_stripe_size >> PAGE_SHIFT;
CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, "
"stripe_size = %u, stripe no = %u, start index = %lu\n",
if (lio->lis_active_subios == 1) {
int idx = lio->lis_single_subio_index;
- LASSERT(idx < lio->lis_nr_subios);
- sub = lov_sub_get(env, lio, idx);
- LASSERT(!IS_ERR(sub));
- LASSERT(sub->sub_io == &lio->lis_single_subio);
- rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+ LASSERT(idx < lio->lis_nr_subios);
+ sub = lov_sub_get(env, lio, idx);
+ LASSERT(!IS_ERR(sub));
+ LASSERT(sub->sub_io == &lio->lis_single_subio);
+ rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
crt, queue);
- lov_sub_put(sub);
- RETURN(rc);
- }
+ RETURN(rc);
+ }
LASSERT(lio->lis_subs != NULL);
if (!IS_ERR(sub)) {
rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
crt, cl2q);
- lov_sub_put(sub);
} else {
rc = PTR_ERR(sub);
}
LASSERT(sub->sub_io == &lio->lis_single_subio);
rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
from, to, cb);
- lov_sub_put(sub);
RETURN(rc);
}
if (!IS_ERR(sub)) {
rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
plist, from, stripe_to, cb);
- lov_sub_put(sub);
} else {
rc = PTR_ERR(sub);
break;
struct lov_io *lio;
struct lov_io_sub *sub;
- ENTRY;
- fio = &ios->cis_io->u.ci_fault;
- lio = cl2lov_io(env, ios);
- sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
- sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
- lov_sub_put(sub);
- RETURN(lov_io_start(env, ios));
+ ENTRY;
+ fio = &ios->cis_io->u.ci_fault;
+ lio = cl2lov_io(env, ios);
+ sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
+ sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
+ RETURN(lov_io_start(env, ios));
}
static void lov_io_fsync_end(const struct lu_env *env,
list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
struct cl_io *subio = sub->sub_io;
- lov_sub_enter(sub);
lov_io_end_wrapper(sub->sub_env, subio);
- lov_sub_exit(sub);
if (subio->ci_result == 0)
*written += subio->u.ci_fsync.fi_nr_written;
.cio_start = lov_io_start,
.cio_end = lov_io_fsync_end
},
+ [CIT_LADVISE] = {
+ .cio_fini = lov_io_fini,
+ .cio_iter_init = lov_io_iter_init,
+ .cio_iter_fini = lov_io_iter_fini,
+ .cio_lock = lov_io_lock,
+ .cio_unlock = lov_io_unlock,
+ .cio_start = lov_io_start,
+ .cio_end = lov_io_end
+ },
[CIT_MISC] = {
.cio_fini = lov_io_fini
}
[CIT_FSYNC] = {
.cio_fini = lov_empty_io_fini
},
+ [CIT_LADVISE] = {
+ .cio_fini = lov_empty_io_fini
+ },
[CIT_MISC] = {
.cio_fini = lov_empty_io_fini
}
result = 0;
break;
case CIT_FSYNC:
+ case CIT_LADVISE:
case CIT_SETATTR:
case CIT_DATA_VERSION:
result = +1;
switch (io->ci_type) {
default:
LASSERTF(0, "invalid type %d\n", io->ci_type);
+ result = -EOPNOTSUPP;
+ break;
case CIT_MISC:
case CIT_FSYNC:
+ case CIT_LADVISE:
case CIT_DATA_VERSION:
result = 1;
break;
result = -ENODATA;
break;
}
+
if (result == 0) {
cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
atomic_inc(&lov->lo_active_ios);