-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* Copyright (c) 2007 Cluster File Systems, Inc.
* Author: Nikita Danilov <nikita@clusterfs.com>
*
# include <sys/statfs.h>
#endif
-#include <sysio.h>
-#ifdef HAVE_XTIO_H
-#include <xtio.h>
-#endif
-#include <fs.h>
-#include <mount.h>
-#include <inode.h>
-#ifdef HAVE_FILE_H
-#include <file.h>
-#endif
#include <liblustre.h>
#include <obd.h>
static void slp_page_completion_common(const struct lu_env *env,
struct ccc_page *cp, int ioret)
{
- struct cl_sync_io *anchor = cp->cpg_sync_io;
-
- if (anchor) {
- cp->cpg_sync_io = NULL;
- cl_sync_io_note(anchor, ioret);
- } else {
- LBUG();
- }
+ LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
}
static void slp_page_completion_read(const struct lu_env *env,
static int slp_lock_enqueue(const struct lu_env *env,
const struct cl_lock_slice *slice,
- struct cl_io *_, __u32 enqflags)
+ struct cl_io *unused, __u32 enqflags)
{
CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
*/
static int slp_io_rw_lock(const struct lu_env *env,
- const struct cl_io_slice *ios)
+ const struct cl_io_slice *ios)
{
- struct cl_io *io = ios->cis_io;
+ struct ccc_io *cio = ccc_env_io(env);
+ struct cl_io *io = ios->cis_io;
loff_t start;
loff_t end;
start = io->u.ci_wr.wr.crw_pos;
end = start + io->u.ci_wr.wr.crw_count - 1;
}
+
+ ccc_io_update_iov(env, cio, io);
+
/*
* This acquires real DLM lock only in O_APPEND case, because of
* the io->ci_lockreq setting in llu_io_init().
}
-static int slp_io_trunc_iter_init(const struct lu_env *env,
- const struct cl_io_slice *ios)
+static int slp_io_setattr_iter_init(const struct lu_env *env,
+ const struct cl_io_slice *ios)
{
return 0;
}
-static int slp_io_trunc_start(const struct lu_env *env,
- const struct cl_io_slice *ios)
+static int slp_io_setattr_start(const struct lu_env *env,
+ const struct cl_io_slice *ios)
{
return 0;
}
OBD_FREE_PTR(page);
}
+
static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
struct llu_io_group *group,
char *buf, size_t count, loff_t pos)
struct intnl_stat *st = llu_i2stat(inode);
struct obd_export *exp = llu_i2obdexp(inode);
struct page *page;
- int rc = 0, npages = 0, ret_bytes = 0;
- int local_lock;
+ int rc = 0, ret_bytes = 0;
struct cl_page *clp;
- struct ccc_page *clup;
struct cl_2queue *queue;
- struct cl_sync_io *anchor = &ccc_env_info(env)->cti_sync_io;
ENTRY;
if (!exp)
RETURN(-EINVAL);
- local_lock = group->lig_params->lrp_lock_mode != LCK_NL;
-
queue = &io->ci_queue;
cl_2queue_init(queue);
break;
}
- clup = cl2ccc_page(cl_page_at(clp, &slp_device_type));
- clup->cpg_sync_io = anchor;
cl_2queue_add(queue, clp);
/* drop the reference count for cl_page_find, so that the page
cl_page_clip(env, clp, offset, offset+bytes);
- npages++;
count -= bytes;
pos += bytes;
buf += bytes;
page++;
} while (count);
- cl_sync_io_init(anchor, npages);
- /* printk("Inited anchor with %d pages\n", npages); */
-
if (rc == 0) {
- enum cl_req_type crt;
-
- crt = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
- rc = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
- if (rc == 0) {
- /* If some pages weren't sent for any reason, count
- * then as completed, to avoid infinite wait. */
- cl_page_list_for_each(clp, &queue->c2_qin) {
- CL_PAGE_DEBUG(D_ERROR, env, clp,
- "not completed\n");
- cl_sync_io_note(anchor, +1);
- }
- /* wait for the IO to be finished. */
- rc = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
- }
+ enum cl_req_type iot;
+ iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
+ rc = cl_io_submit_sync(env, io, iot, queue, 0);
}
group->lig_rc = rc;
OBD_FREE_PTR(group);
}
+/**
+ * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
+ */
+int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
+{
+ return 1;
+}
+
static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
{
struct ccc_io *cio = cl2ccc_io(env, ios);
struct cl_io *io = ios->cis_io;
struct cl_object *obj = io->ci_obj;
struct inode *inode = ccc_object_inode(obj);
- int err, ret;
- loff_t pos;
- size_t cnt;
+ int err, ret;
+ loff_t pos;
+ long cnt;
struct llu_io_group *iogroup;
struct lustre_rw_params p = {0};
int iovidx;
if (IS_ERR(iogroup))
RETURN(PTR_ERR(iogroup));
- err = ccc_prep_size(env, obj, io, pos, cnt, 0, &exceed);
+ err = ccc_prep_size(env, obj, io, pos, cnt, &exceed);
if (err != 0 || (write == 0 && exceed != 0))
GOTO(out, err);
CDEBUG(D_INODE,
- "%s ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
- write?"Write":"Read", (unsigned long)st->st_ino,
+ "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
+ write ? "Write" : "Read", (unsigned long)st->st_ino,
cnt, (__u64)pos, (__u64)st->st_size);
if (write && io->u.ci_wr.wr_append)
for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
char *buf = (char *) cio->cui_iov[iovidx].iov_base;
- size_t count = cio->cui_iov[iovidx].iov_len;
+ long count = cio->cui_iov[iovidx].iov_len;
if (!count)
continue;
}
LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
- session->lis_groups[session->lis_ngroups++] = iogroup;
+ if (!iogroup->lig_rc)
+ session->lis_rwcount += iogroup->lig_rwcount;
+ else if (!session->lis_rc)
+ session->lis_rc = iogroup->lig_rc;
+ err = 0;
- return 0;
out:
put_io_group(iogroup);
return err;
.cio_fini = ccc_io_fini,
.cio_lock = slp_io_rw_lock,
.cio_start = slp_io_start,
- .cio_end = ccc_io_end
+ .cio_end = ccc_io_end,
+ .cio_advance = ccc_io_advance
},
[CIT_WRITE] = {
.cio_fini = ccc_io_fini,
.cio_lock = slp_io_rw_lock,
.cio_start = slp_io_start,
- .cio_end = ccc_io_end
+ .cio_end = ccc_io_end,
+ .cio_advance = ccc_io_advance
},
- [CIT_TRUNC] = {
+ [CIT_SETATTR] = {
.cio_fini = ccc_io_fini,
- .cio_iter_init = slp_io_trunc_iter_init,
- .cio_start = slp_io_trunc_start
+ .cio_iter_init = slp_io_setattr_iter_init,
+ .cio_start = slp_io_setattr_start
},
[CIT_MISC] = {
.cio_fini = ccc_io_fini