__u32 type;
__u32 version;
__u32 opc;
- __u64 last_rcvd;
+ __u64 last_xid;
__u64 last_committed;
__u64 transno;
__u32 status;
__u32 repbuf;
};
+struct mds_fileh_body {
+ struct ll_fid f_fid;
+ struct lustre_handle f_handle;
+};
+
struct mds_conn_status {
struct ll_fid rootfid;
__u64 xid;
__u32 ino;
__u32 nlink;
__u32 generation;
- __u32 last_xid;
+ __u32 last_xidnomore;
};
/* MDS update records */
-struct mds_update_record_hdr {
- __u32 ur_opcode;
-};
+
+
+//struct mds_update_record_hdr {
+// __u32 ur_opcode;
+//};
struct mds_rec_setattr {
__u32 sa_opcode;
/* page.c */
+#define CB_PHASE_START 12
+#define CB_PHASE_FINISH 13
+struct io_cb_data {
+ wait_queue_head_t waitq;
+ atomic_t refcount;
+ int complete;
+ int err;
+};
+int ll_sync_io_cb(void *data, int err, int phase);
+struct io_cb_data *ll_init_cb(void);
inline void lustre_put_page(struct page *page);
struct page *lustre_get_page_read(struct inode *dir, unsigned long index);
struct page *lustre_get_page_write(struct inode *dir, unsigned long index);
sigismember(&(task->pending.signal), SIGTERM))
/*
- * Like wait_event_interruptible, but we're only interruptible by KILL, INT, or
- * TERM.
+ * Like wait_event_interruptible, but we're only interruptible by
+ * KILL, INT, or TERM.
*
* XXXshaver These are going away soon, I hope.
*/
int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
uuid_t **uuids, struct ptlrpc_request **request);
int mdc_getstatus(struct lustre_handle *conn,
- struct ll_fid *rootfid, __u64 *last_committed, __u64 *last_rcvd,
- __u32 *last_xid, struct ptlrpc_request **);
+ struct ll_fid *rootfid, __u64 *last_committed, __u32 *last_xid, struct ptlrpc_request **);
int mdc_getattr(struct lustre_handle *conn,
obd_id ino, int type, unsigned long valid, size_t ea_size,
struct ptlrpc_request **request);
__u32 cli_request_portal;
__u32 cli_reply_portal;
- __u64 cli_last_rcvd;
+ __u64 cli_last_xid;
__u64 cli_last_committed;
__u32 cli_target_devno;
int typ_refcnt;
};
+typedef int (*brw_callback_t)(void *, int err, int phase);
+struct brw_page {
+ struct page *pg;
+ obd_size count;
+ obd_off off;
+ obd_flag flag;
+};
/* Individual type definitions */
int cl_max_mdsize;
};
-#if 0
-struct osc_obd {
- struct ptlrpc_client *osc_client;
- struct ptlrpc_client *osc_ldlm_client;
- struct ptlrpc_connection *osc_conn;
- __u8 osc_target_uuid[37];
-};
-#endif
-
struct mds_obd {
struct ptlrpc_service *mds_service;
} u;
};
-typedef void (*brw_callback_t)(void *);
struct obd_ops {
int (*o_iocontrol)(long cmd, struct lustre_handle *, int len,
struct lov_stripe_md *);
int (*o_brw)(int rw, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count oa_bufs,
- struct page **buf, obd_size *count, obd_off *offset,
- obd_flag *flags, brw_callback_t callback, void * data);
+ struct brw_page *pgarr, brw_callback_t callback,
+ void * data);
int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt,
struct lov_stripe_md *md, obd_size count,
obd_off offset);
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *local,
void *desc_private);
- int (*o_enqueue)(struct lustre_handle *conn,
- struct lustre_handle *parent_lock, __u64 *res_id,
+ int (*o_enqueue)(struct lustre_handle *conn, struct lov_stripe_md *md,
+ struct lustre_handle *parent_lock,
__u32 type, void *cookie, int cookielen, __u32 mode,
int *flags, void *cb, void *data, int datalen,
struct lustre_handle *lockh);
- int (*o_cancel)(struct lustre_handle *, __u32 mode,
- struct lustre_handle *);
+ int (*o_cancel)(struct lustre_handle *, struct lov_stripe_md *md, __u32 mode, struct lustre_handle *);
};
#endif
static inline int obd_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md,
obd_count oa_bufs,
- struct page **buf,
- obd_size *count,
- obd_off *offset,
- obd_flag *flags,
+ struct brw_page *pg,
brw_callback_t callback, void *data)
{
int rc;
LBUG();
}
- rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, buf,
- count, offset, flags, callback, data);
+ rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, pg, callback, data);
RETURN(rc);
}
}
static inline int obd_enqueue(struct lustre_handle *conn,
- struct lustre_handle *parent_lock, __u64 *res_id,
+ struct lov_stripe_md *md,
+ struct lustre_handle *parent_lock,
__u32 type, void *cookie, int cookielen,
__u32 mode, int *flags, void *cb, void *data,
int datalen, struct lustre_handle *lockh)
OBD_CHECK_SETUP(conn, export);
OBD_CHECK_OP(export->exp_obd,enqueue);
- rc = OBP(export->exp_obd, enqueue)(conn, parent_lock, res_id, type,
+ rc = OBP(export->exp_obd, enqueue)(conn, md, parent_lock, type,
cookie, cookielen, mode, flags, cb,
data, datalen, lockh);
RETURN(rc);
}
-static inline int obd_cancel(struct lustre_handle *conn, __u32 mode,
+static inline int obd_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, __u32 mode,
struct lustre_handle *lockh)
{
int rc;
OBD_CHECK_SETUP(conn, export);
OBD_CHECK_OP(export->exp_obd,cancel);
- rc = OBP(export->exp_obd, cancel)(conn, mode, lockh);
+ rc = OBP(export->exp_obd, cancel)(conn, md, mode, lockh);
RETURN(rc);
}
conn.addr = req->rq_reqmsg->addr;
conn.cookie = req->rq_reqmsg->cookie;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+ rc = lustre_pack_msg(0,
+ NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
GOTO(out, rc);
b->ino = HTON__u32(b->ino);
b->nlink = HTON__u32(b->nlink);
b->generation = HTON__u32(b->generation);
- b->last_xid = HTON__u32(b->last_xid);
}
void mds_getattr_pack(struct ptlrpc_request *req, int offset,
b->ino = NTOH__u32(b->ino);
b->nlink = NTOH__u32(b->nlink);
b->generation = NTOH__u32(b->generation);
- b->last_xid = NTOH__u32(b->last_xid);
}
static int mds_setattr_unpack(struct ptlrpc_request *req, int offset,
int mds_update_unpack(struct ptlrpc_request *req, int offset,
struct mds_update_record *rec)
{
- struct mds_update_record_hdr *hdr =
- lustre_msg_buf(req->rq_reqmsg, offset);
+ __u32 *opcode = lustre_msg_buf(req->rq_reqmsg, offset);
int rc;
ENTRY;
- if (!hdr || req->rq_reqmsg->buflens[offset] < sizeof(*hdr))
+ if (!opcode || req->rq_reqmsg->buflens[offset] < sizeof(*opcode))
RETURN(-EFAULT);
- rec->ur_opcode = NTOH__u32(hdr->ur_opcode);
+ rec->ur_opcode = NTOH__u32(*opcode);
if (rec->ur_opcode < 0 || rec->ur_opcode > REINT_MAX)
RETURN(-EFAULT);
#include <linux/lustre_net.h>
#include <linux/lustre_lib.h>
+
+int ll_sync_io_cb(void *data, int err, int phase)
+{
+ struct io_cb_data *d = data;
+ int ret;
+ ENTRY;
+
+ if (phase == CB_PHASE_START) {
+ ret = l_wait_event_killable(d->waitq, d->complete);
+ if (atomic_dec_and_test(&d->refcount))
+ OBD_FREE(d, sizeof(*d));
+ if (ret == -ERESTARTSYS)
+ return ret;
+ } else if (phase == CB_PHASE_FINISH) {
+ d->err = err;
+ d->complete = 1;
+ wake_up(&d->waitq);
+ if (atomic_dec_and_test(&d->refcount))
+ OBD_FREE(d, sizeof(*d));
+ return err;
+ } else
+ LBUG();
+ EXIT;
+ return 0;
+}
+
+struct io_cb_data *ll_init_cb(void)
+{
+ struct io_cb_data *d;
+
+
+ OBD_ALLOC(d, sizeof(*d));
+ if (d) {
+ init_waitqueue_head(&d->waitq);
+ atomic_set(&d->refcount, 2);
+ }
+ RETURN(d);
+}
+
/*
* Remove page from dirty list
*/
struct inode *inode = filp->f_dentry->d_inode;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ldlm_extent extent;
- struct lustre_handle lockh;
- __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+ struct lustre_handle *lockhs = NULL;
+ struct lov_stripe_md *md = ll_i2info(inode)->lli_smd;
int flags = 0;
ldlm_error_t err;
ssize_t retval;
ENTRY;
+
+
if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
+ OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
+ if (!lockhs)
+ RETURN(-ENOMEM);
+
extent.start = *ppos;
extent.end = *ppos + count;
CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
inode->i_ino, extent.start, extent.end);
- err = obd_enqueue(&sbi->ll_osc_conn, NULL, res_id, LDLM_EXTENT,
+ err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT,
&extent, sizeof(extent), LCK_PR, &flags,
ll_lock_callback, inode, sizeof(*inode),
- &lockh);
+ lockhs);
if (err != ELDLM_OK) {
+ OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
CERROR("lock enqueue: err: %d\n", err);
RETURN(err);
}
- ldlm_lock_dump((void *)(unsigned long)lockh.addr);
}
CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n",
ll_update_atime(inode);
if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
- err = obd_cancel(&sbi->ll_osc_conn, LCK_PR, &lockh);
+ err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PR, lockhs);
if (err != ELDLM_OK) {
+ OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
CERROR("lock cancel: err: %d\n", err);
RETURN(err);
}
}
+ if (lockhs)
+ OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
RETURN(retval);
}
struct inode *inode = file->f_dentry->d_inode;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ldlm_extent extent;
- struct lustre_handle lockh;
- __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+ struct lustre_handle *lockhs = NULL;
+ struct lov_stripe_md *md = ll_i2info(inode)->lli_smd;
int flags = 0;
ldlm_error_t err;
ssize_t retval;
ENTRY;
if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
+ OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
+ if (!lockhs)
+ RETURN(-ENOMEM);
/* FIXME: this should check whether O_APPEND is set and adjust
* extent.start accordingly */
extent.start = *ppos;
CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
inode->i_ino, extent.start, extent.end);
- err = obd_enqueue(&sbi->ll_osc_conn, NULL, res_id, LDLM_EXTENT,
+ err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT,
&extent, sizeof(extent), LCK_PW, &flags,
ll_lock_callback, inode, sizeof(*inode),
- &lockh);
+ lockhs);
if (err != ELDLM_OK) {
+ OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
CERROR("lock enqueue: err: %d\n", err);
RETURN(err);
}
- ldlm_lock_dump((void *)(unsigned long)lockh.addr);
}
CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n",
retval = generic_file_write(file, buf, count, ppos);
if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
- err = obd_cancel(&sbi->ll_osc_conn, LCK_PW, &lockh);
+ err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PW, lockhs);
if (err != ELDLM_OK) {
+ OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
CERROR("lock cancel: err: %d\n", err);
RETURN(err);
}
}
+ if (lockhs)
+ OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
RETURN(retval);
}
static int ll_reconnect(struct ll_sb_info *sbi)
{
struct ll_fid rootfid;
- __u64 last_committed, last_rcvd;
+ __u64 last_committed;
__u32 last_xid;
int err;
struct ptlrpc_request *request;
/* XXX: need to store the last_* values somewhere */
err = mdc_getstatus(&sbi->ll_mdc_conn,
&rootfid, &last_committed,
- &last_rcvd,
&last_xid,
&request);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
GOTO(out_disc, err = -ENOTCONN);
}
- sbi2mdc(sbi)->cl_client->cli_last_rcvd = last_xid;
+ sbi2mdc(sbi)->cl_client->cli_last_xid = last_xid;
sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD;
out_disc:
/* replay what needs to be replayed */
if (req->rq_flags & PTL_RPC_FL_REPLAY) {
CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_rcvd);
+ req->rq_xid, cli->cli_last_xid);
rc = ptlrpc_replay_req(req);
if (rc) {
CERROR("recovery replay error %d for request %Ld\n",
/* server has seen req, we have reply: skip */
if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid <= cli->cli_last_rcvd) {
+ req->rq_xid <= cli->cli_last_xid) {
CDEBUG(D_INODE, "req %Ld was complete: skip [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_rcvd);
+ req->rq_xid, cli->cli_last_xid);
continue;
}
/* server has lost req, we have reply: resend, ign reply */
if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid > cli->cli_last_rcvd) {
+ req->rq_xid > cli->cli_last_xid) {
CDEBUG(D_INODE, "lost req %Ld have rep: replay [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_rcvd);
+ req->rq_xid, cli->cli_last_xid);
rc = ptlrpc_replay_req(req);
if (rc) {
CERROR("request resend error %d for request %Ld\n",
/* server has seen req, we have lost reply: -ERESTARTSYS */
if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid <= cli->cli_last_rcvd) {
+ req->rq_xid <= cli->cli_last_xid) {
CDEBUG(D_INODE, "lost rep %Ld srv did req: restart [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_rcvd);
+ req->rq_xid, cli->cli_last_xid);
ptlrpc_restart_req(req);
}
/* service has not seen req, no reply: resend */
if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid > cli->cli_last_rcvd) {
+ req->rq_xid > cli->cli_last_xid) {
CDEBUG(D_INODE, "lost rep/req %Ld: resend [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_rcvd);
+ req->rq_xid, cli->cli_last_xid);
ptlrpc_resend_req(req);
}
#include <linux/lustre_lite.h>
#include <linux/lustre_lib.h>
+
/* SYNCHRONOUS I/O to object storage for an inode */
static int ll_brw(int rw, struct inode *inode, struct page *page, int create)
{
struct ll_inode_info *lii = ll_i2info(inode);
struct lov_stripe_md *md = lii->lli_smd;
- obd_size count = PAGE_SIZE;
- obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
- obd_flag flags = create ? OBD_BRW_CREATE : 0;
+ struct brw_page pg;
int err;
+ struct io_cb_data *cbd = ll_init_cb();
ENTRY;
+ if (!cbd)
+ RETURN(-ENOMEM);
+
+ pg.pg = page;
+ pg.count = PAGE_SIZE;
+ pg.off = ((obd_off)page->index) << PAGE_SHIFT;
+ pg.flag = create ? OBD_BRW_CREATE : 0;
- err = obd_brw(rw, ll_i2obdconn(inode), md, 1,
- &page, &count, &offset, &flags, NULL, NULL);
+ err = obd_brw(rw, ll_i2obdconn(inode), md, 1, &pg, ll_sync_io_cb, cbd);
RETURN(err);
} /* ll_brw */
RETURN(err);
}
+
/* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
* too */
static int ll_commit_write(struct file *file, struct page *page,
struct inode *inode = page->mapping->host;
struct ll_inode_info *lii = ll_i2info(inode);
struct lov_stripe_md *md = lii->lli_smd;
- obd_size count = to;
- obd_off offset = (((obd_off)page->index) << PAGE_SHIFT);
- obd_flag flags = create ? OBD_BRW_CREATE : 0;
+ struct brw_page pg;
int err;
struct iattr iattr;
+ struct io_cb_data *cbd = ll_init_cb();
+
+ pg.pg = page;
+ pg.count = to;
+ pg.off = (((obd_off)page->index) << PAGE_SHIFT);
+ pg.flag = create ? OBD_BRW_CREATE : 0;
ENTRY;
+ if (!cbd)
+ RETURN(-ENOMEM);
SetPageUptodate(page);
LBUG();
CDEBUG(D_INODE, "commit_page writing (at %d) to %d, count %Ld\n",
- from, to, (unsigned long long)count);
+ from, to, (unsigned long long)pg.count);
err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md,
- 1, &page, &count, &offset, &flags, NULL, NULL);
+ 1, &pg, ll_sync_io_cb, cbd);
kunmap(page);
- iattr.ia_size = offset + to;
+ iattr.ia_size = pg.off + pg.count;
if (iattr.ia_size > inode->i_size) {
/* do NOT truncate when writing in the middle of a file */
inode->i_size = iattr.ia_size;
obd_count bufs_per_obdo = iobuf->nr_pages;
struct ll_inode_info *lii = ll_i2info(inode);
struct lov_stripe_md *md = lii->lli_smd;
- obd_size *count = NULL;
- obd_off *offset = NULL;
- obd_flag *flags = NULL;
+ struct brw_page *pga;
int rc = 0;
int i;
+ struct io_cb_data *cbd = ll_init_cb();
ENTRY;
+ if (!cbd)
+ RETURN(-ENOMEM);
if (blocksize != PAGE_SIZE) {
CERROR("direct_IO blocksize != PAGE_SIZE\n");
return -EINVAL;
}
- OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
- OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
- OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
- if (!count || !offset || !flags)
+ OBD_ALLOC(pga, sizeof(*pga) * bufs_per_obdo);
+ if (pga)
GOTO(out, rc = -ENOMEM);
/* NB: we can't use iobuf->maplist[i]->index for the offset
* instead of "blocknr" because ->index contains garbage.
*/
for (i = 0; i < bufs_per_obdo; i++, blocknr++) {
- count[i] = PAGE_SIZE;
- offset[i] = (obd_off)blocknr << PAGE_SHIFT;
- flags[i] = OBD_BRW_CREATE;
+ pga[i].pg = iobuf->maplist[i];
+ pga[i].count = PAGE_SIZE;
+ pga[i].off = (obd_off)blocknr << PAGE_SHIFT;
+ pga[i].flag = OBD_BRW_CREATE;
}
if (!md || !md->lmd_object_id)
GOTO(out, rc = -ENOMEM);
rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
- ll_i2obdconn(inode), md, bufs_per_obdo,
- iobuf->maplist, count, offset, flags, NULL, NULL);
+ ll_i2obdconn(inode), md, bufs_per_obdo, pga,
+ ll_sync_io_cb, cbd);
if (rc == 0)
rc = bufs_per_obdo * PAGE_SIZE;
out:
- OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
- OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
- OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
+ OBD_FREE(pga, sizeof(*pga) * bufs_per_obdo);
RETURN(rc);
}
int err;
struct ll_fid rootfid;
struct statfs sfs;
- __u64 last_committed, last_rcvd;
+ __u64 last_committed;
__u32 last_xid;
struct ptlrpc_request *request = NULL;
struct ll_inode_md md;
/* XXX: need to store the last_* values somewhere */
err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
- &last_rcvd, &last_xid, &request);
+ &last_xid, &request);
ptlrpc_req_finished(request);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
md->lmd_stripe_count = lov->desc.ld_default_stripe_count;
}
+ if (!md->lmd_stripe_size)
+ md->lmd_stripe_size = lov->desc.ld_default_stripe_size;
+
+
+
for (i = 0; i < md->lmd_stripe_count; i++) {
struct lov_stripe_md obj_md;
struct lov_stripe_md *obj_mdp = &obj_md;
for (i = 0; i < md->lmd_stripe_count; i++) {
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
- oa->o_id = md->lmd_objects[i].l_object_id;
+ tmp.o_id = md->lmd_objects[i].l_object_id;
rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
if (!rc) {
CERROR("Error destroying object %Ld on %d\n",
ENTRY;
if (!md) {
- CERROR("LOV requires striping ea for desctruction\n");
+ CERROR("LOV requires striping ea for opening\n");
RETURN(-EINVAL);
}
oa->o_id = md->lmd_objects[i].l_object_id;
rc = obd_open(&lov->tgts[i].conn, &tmp, NULL);
- if (!rc) {
+ if (rc) {
CERROR("Error getattr object %Ld on %d\n",
oa->o_id, i);
}
RETURN(rc);
}
+#ifndef log2
+#define log2(n) ffz(~(n))
+#endif
+
/* compute offset in stripe i corresponds to offset "in" */
__u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
{
return (__u64) out;
}
-
-struct lov_callback_data {
- atomic_t count;
- wait_queue_head_t waitq;
-};
-
-static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
+/* compute offset in stripe i corresponds to offset "in" */
+__u64 lov_stripe(struct lov_stripe_md *md, __u64 in, int *j)
{
- struct lov_callback_data *cb_data = data;
+ __u32 ssz = md->lmd_stripe_size;
+ __u32 off, out;
+ /* full stripes across all * stripe size */
+ *j = (((__u32) in)/ssz) % md->lmd_stripe_count;
+ off = (__u32)in % (md->lmd_stripe_count * ssz);
+ out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz +
+ (off - ((*j) * ssz)) % ssz;;
- if (atomic_dec_and_test(&cb_data->count))
- wake_up(&cb_data->waitq);
+ return (__u64) out;
}
-static int lov_read_check_status(struct lov_callback_data *cb_data)
+int lov_stripe_which(struct lov_stripe_md *md, __u64 in)
{
- ENTRY;
- if (sigismember(&(current->pending.signal), SIGKILL) ||
- sigismember(&(current->pending.signal), SIGTERM) ||
- sigismember(&(current->pending.signal), SIGINT)) {
- // FIXME XXX what here
- // cb_data->flags |= PTL_RPC_FL_INTR;
- RETURN(1);
- }
- if (atomic_read(&cb_data->count) == 0)
- RETURN(1);
- RETURN(0);
+ __u32 ssz = md->lmd_stripe_size;
+ int j;
+ j = (((__u32) in)/ssz) % md->lmd_stripe_count;
+ return j;
}
__u64 starti = lov_offset(md, start, i);
__u64 endi = lov_offset(md, end, i);
+ if (starti == endi)
+ continue;
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
oa->o_id = md->lmd_objects[i].l_object_id;
RETURN(rc);
}
+struct lov_callback_data {
+ atomic_t count;
+ struct io_cb_data *cbd;
+ brw_callback_t cb;
+ int err;
+};
+
+int lov_osc_brw_callback(void *data, int err, int phase)
+{
+ struct lov_callback_data *d = data;
+ int ret = 0;
+ ENTRY;
+
+ if (phase == CB_PHASE_START) {
+ RETURN(0);
+ } else if (phase == CB_PHASE_FINISH) {
+ if (err)
+ d->err = err;
+ if (atomic_dec_and_test(&d->count)) {
+ ret = d->cb(d->cbd, 0, d->err);
+ }
+ RETURN(ret);
+ } else
+ LBUG();
+ EXIT;
+ return 0;
+}
-#if 0
-static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
- struct obdo **oa,
- obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags,
- bulk_callback_t callback, void *data)
+static inline int lov_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *md,
+ obd_count oa_bufs,
+ struct brw_page *pga,
+ brw_callback_t callback, void *data)
{
- int rc, i, page_array_offset = 0;
- obd_off off = offset;
- obd_size retval = 0;
- struct lov_callback_data *cb_data;
+ int stripe_count = md->lmd_stripe_count;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
+ struct {
+ int bufct;
+ int index;
+ int subcount;
+ struct lov_stripe_md md;
+ } *stripeinfo;
+ struct brw_page *ioarr;
+ int rc, i;
+ struct lov_callback_data *lov_cb_data;
ENTRY;
- if (num_oa != 1)
- LBUG();
+ lov = &export->exp_obd->u.lov;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
+ OBD_ALLOC(lov_cb_data, sizeof(*lov_cb_data));
+ if (!lov_cb_data)
+ RETURN(-ENOMEM);
- OBD_ALLOC(cb_data, sizeof(*cb_data));
- if (cb_data == NULL) {
- LBUG();
+ OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
+ if (!stripeinfo)
+ RETURN(-ENOMEM);
+
+ OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
+ if (!ioarr) {
+ OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
RETURN(-ENOMEM);
}
- INIT_WAITQUEUE_HEAD(&cb_data->waitq);
- atomic_set(&cb_data->count, 0);
- for (i = 0; i < oa_bufs[0]; i++) {
- struct page *current_page = buf[i];
+ for (i=0 ; i < oa_bufs ; i++ ) {
+ int which;
+ which = lov_stripe_which(md, pga[i].pg->index * PAGE_SIZE);
+ stripeinfo[which].bufct++;
+ }
- struct lov_md *md = (struct lov_md *)oa[i]->inline;
- int bufcount = oa_bufs[i];
- // md->lmd_stripe_count
+ for (i=0 ; i < stripe_count ; i++) {
+ if (i>0)
+ stripeinfo[i].index =
+ stripeinfo[i-1].index + stripeinfo[i-1].bufct;
+ stripeinfo[i].md.lmd_object_id =
+ md->lmd_objects[i].l_object_id;
+ }
- for (k = page_array_offset; k < bufcount + page_array_offset;
- k++) {
-
- }
- page_array_offset += bufcount;
-
-
- while (off < offset + count) {
- int stripe, conn;
- obd_size size, tmp;
-
- stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
- size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
- if (size > *count)
- size = *count;
-
- conn = stripe % conn->oc_dev->obd_multi_count;
-
- tmp = size;
- atomic_inc(&cb_data->count);
- rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
- num_oa, oa, buf,
- &size, off, lov_read_callback, cb_data);
- if (rc == 0)
- retval += size;
- else {
- CERROR("read(off=%Lu, count=%Lu): %d\n",
- (unsigned long long)off,
- (unsigned long long)size, rc);
- break;
- }
+ for (i=0 ; i < oa_bufs ; i++ ) {
+ int which, shift;
+ which = lov_stripe_which(md, pga[i].pg->index * PAGE_SIZE);
- buf += size;
+ shift = stripeinfo[which].index;
+ ioarr[shift + stripeinfo[which].subcount] = pga[i];
+ pga[i].off = lov_offset(md, pga[i].pg->index * PAGE_SIZE, which);
+ stripeinfo[which].subcount++;
}
+
+ lov_cb_data->cb = callback;
+ lov_cb_data->cbd = data;
+ atomic_set(&lov_cb_data->count, oa_bufs);
+ for (i=0 ; i < stripe_count ; i++) {
+ int shift = stripeinfo[i].index;
- wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
- if (cb_data->flags & PTL_RPC_FL_INTR)
- rc = -EINTR;
+ obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md,
+ stripeinfo[i].bufct, &ioarr[shift],
+ lov_osc_brw_callback, &lov_cb_data);
+ }
- /* FIXME: The error handling here sucks */
- *count = retval;
- OBD_FREE(cb_data, sizeof(*cb_data));
- RETURN(rc);
-}
+ rc = callback(lov_cb_data, 0, CB_PHASE_START);
-static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
-{
-
+ RETURN(rc);
}
-/* buffer must lie in user memory here */
-static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
- obd_size *count, obd_off offset)
+static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md,
+ struct lustre_handle *parent_lock,
+ __u32 type, void *cookie, int cookielen, __u32 mode,
+ int *flags, void *cb, void *data, int datalen,
+ struct lustre_handle *lockhs)
{
- int err;
- struct file *file;
- unsigned long retval;
-
+ int rc = 0, i;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn)) {
- CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
- }
- file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
- if (!file || IS_ERR(file)) {
- EXIT;
- return -PTR_ERR(file);
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
}
- /* count doubles as retval */
- retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
- filp_close(file, 0);
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
- if ( retval >= 0 ) {
- err = 0;
- *count = retval;
- EXIT;
- } else {
- err = retval;
- *count = 0;
- EXIT;
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
+ struct ldlm_extent sub_ext;
+ struct lov_stripe_md submd;
+
+ sub_ext.start = lov_offset(md, extent->start, i);
+ sub_ext.end = lov_offset(md, extent->end, i);
+ if ( sub_ext.start == sub_ext.end )
+ continue;
+
+ submd.lmd_object_id = md->lmd_objects[i].l_object_id;
+ submd.lmd_easize = sizeof(submd);
+ rc = obd_enqueue(&(lov->tgts[i].conn), &submd, parent_lock, type,
+ &sub_ext, sizeof(sub_ext), mode, flags, cb, data, datalen, &(lockhs[i]));
+ // XXX add a lock debug statement here
+ if (!rc) {
+ CERROR("Error punch object %Ld subobj %Ld\n", md->lmd_object_id,
+ md->lmd_objects[i].l_object_id);
+ }
}
-
- return err;
+ RETURN(rc);
}
-static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
- struct ldlm_handle *parent_lock, __u64 *res_id,
- __u32 type, struct ldlm_extent *extent, __u32 mode,
- int *flags, void *data, int datalen,
- struct ldlm_handle *lockh)
+static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, __u32 mode,
+ struct lustre_handle *lockhs)
{
- int rc;
+ int rc = 0, i;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
+ if (!md) {
+ CERROR("LOV requires striping ea for lock cancellation\n");
+ RETURN(-EINVAL);
+ }
- rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
- res_id, type, extent, mode, flags, data, datalen,
- lockh);
- RETURN(rc);
-}
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
-static int lov_cancel(struct lustre_handle *conn, __u32 mode,
- struct ldlm_handle *lockh)
-{
- int rc;
- ENTRY;
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ struct lov_stripe_md submd;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
+ if ( lockhs[i].addr == 0 )
+ continue;
- rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
+ submd.lmd_object_id = md->lmd_objects[i].l_object_id;
+ submd.lmd_easize = sizeof(submd);
+ rc = obd_cancel(&lov->tgts[i].conn, &submd, mode, &lockhs[i]);
+ if (!rc) {
+ CERROR("Error punch object %Ld subobj %Ld\n", md->lmd_object_id,
+ md->lmd_objects[i].l_object_id);
+ }
+ }
RETURN(rc);
}
-#endif
+
+
+
struct obd_ops lov_obd_ops = {
o_setup: lov_setup,
o_setattr: lov_setattr,
o_open: lov_open,
o_close: lov_close,
-#if 0
- o_brw: lov_pgcache_brw,
+ o_brw: lov_brw,
o_punch: lov_punch,
o_enqueue: lov_enqueue,
o_cancel: lov_cancel
-#endif
};
struct inode *inode, struct iattr *iattr,
struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
- struct mds_rec_setattr *rec;
struct ptlrpc_request *req;
+ struct mds_rec_setattr *rec;
int rc, size = sizeof(*rec);
ENTRY;
- mdc_con2cl(conn, &cl, &connection, &rconn);
req = ptlrpc_prep_req2(conn, MDS_REINT, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
resend:
rc = mdc_reint(req, level);
if (rc == -ERESTARTSYS) {
- struct mds_update_record_hdr *hdr =
- lustre_msg_buf(req->rq_reqmsg, 0);
+ __u32 *opcode = lustre_msg_buf(req->rq_reqmsg, 0);
level = LUSTRE_CONN_RECOVD;
CERROR("Lost reply: re-create rep.\n");
req->rq_flags = 0;
- hdr->ur_opcode = NTOH__u32(REINT_RECREATE);
+ *opcode = NTOH__u32(REINT_RECREATE);
goto resend;
}
}
int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
- __u64 *last_committed, __u64 *last_rcvd,
- __u32 *last_xid, struct ptlrpc_request **request)
+ __u64 *last_committed, __u32 *last_xid,
+ struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
struct mds_body *body;
mds_unpack_body(body);
memcpy(rootfid, &body->fid1, sizeof(*rootfid));
*last_committed = req->rq_repmsg->last_committed;
- *last_rcvd = req->rq_repmsg->last_rcvd;
- *last_xid = body->last_xid;
+ *last_xid = req->rq_repmsg->last_xid;
- CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_rcvd=%Lu,"
- " last_xid=%d\n",
+ CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_xid=%d\n",
(unsigned long)rootfid->id,
- (unsigned long long)*last_committed,
- (unsigned long long)*last_rcvd,
- body->last_xid);
+ (unsigned long long)*last_committed, last_xid);
}
EXIT;
obd_id ino, int type, unsigned long valid, size_t ea_size,
struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ptlrpc_request *req;
struct mds_body *body;
int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
ENTRY;
- mdc_con2cl(conn, &cl, &connection, &rconn);
req = ptlrpc_prep_req2(conn, MDS_GETATTR, 1, size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
/* mcd_last_xid is is stored in little endian on the disk and
mds_pack_rep_body converts it to network order */
- body->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
+ req->rq_repmsg->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
mds_pack_rep_body(req);
RETURN(0);
}
if (!rc) {
struct mds_obd *mds = mds_req2mds(req);
- req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
+ req->rq_repmsg->last_xid = HTON__u64(mds->mds_last_rcvd);
req->rq_repmsg->last_committed =
HTON__u64(mds->mds_last_committed);
CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
MODULE = obdclass
modulefs_DATA = obdclass.o
EXTRA_PROGRAMS = obdclass
-obdclass_SOURCES = genops.c proc_lustre.c class_obd.c sysctl.c
+obdclass_SOURCES = genops.c proc_lustre.c class_obd.c sysctl.c page.c
+
+page.c:
+ test -e page.c || ln -sf $(top_srcdir)/lib/page.c
include $(top_srcdir)/Rules
rw = OBD_BRW_WRITE;
case OBD_IOC_BRW_READ: {
struct lov_stripe_md smd;
+ struct io_cb_data *cbd = ll_init_cb();
obd_count pages = 0;
- struct page **bufs = NULL;
- obd_size *counts = NULL;
- obd_off *offsets = NULL;
- obd_flag *flags = NULL;
+ struct brw_page *pga;
int j;
unsigned long off;
void *from;
+
+ if (!cbd)
+ GOTO(out, -ENOMEM);
obd_data2conn(&conn, data);
CDEBUG(D_INODE, "BRW %s with %d pages\n",
rw == OBD_BRW_READ ? "read" : "write", pages);
- OBD_ALLOC(bufs, pages * sizeof(*bufs));
- OBD_ALLOC(counts, pages * sizeof(*counts));
- OBD_ALLOC(offsets, pages * sizeof(*offsets));
- OBD_ALLOC(flags, pages * sizeof(*flags));
- if (!bufs || !counts || !offsets || !flags) {
+ OBD_ALLOC(pga, pages * sizeof(*pga));
+ if (!pga) {
CERROR("no memory for %d BRW per-page data\n", pages);
GOTO(brw_free, err = -ENOMEM);
}
CERROR("no memory for brw pages\n");
GOTO(brw_cleanup, err = -ENOMEM);
}
- bufs[j] = virt_to_page(to);
- counts[j] = PAGE_SIZE;
- offsets[j] = off;
- flags[j] = 0;
+ pga[j].pg = virt_to_page(to);
+ pga[j].count = PAGE_SIZE;
+ pga[j].off = off;
+ pga[j].flag = 0;
}
- err = obd_brw(rw, &conn, &smd, j, bufs, counts, offsets, flags,
- NULL, NULL);
-
+ err = obd_brw(rw, &conn, &smd, j, pga, ll_sync_io_cb, cbd);
EXIT;
brw_cleanup:
for (j = 0; j < pages; j++)
- if (bufs[j] != NULL)
- __free_pages(bufs[j], 0);
+ if (pga[j].pg != NULL)
+ __free_pages(pga[j].pg, 0);
brw_free:
- OBD_FREE(bufs, pages * sizeof(*bufs));
- OBD_FREE(counts, pages * sizeof(*counts));
- OBD_FREE(offsets, pages * sizeof(*offsets));
- OBD_FREE(flags, pages * sizeof(*flags));
+ OBD_FREE(pga, pages * sizeof(*pga));
GOTO(out, err);
}
default:
static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count oa_bufs,
- struct page **pages, obd_size *count,
- obd_off *offset, obd_flag *flags,
+ struct brw_page *pga,
brw_callback_t callback, void *data)
{
struct obd_run_ctxt saved;
CDEBUG(D_INODE, "OP %d obdo pgno: (%d) (%ld,%ld) "
"off count (%Ld,%Ld)\n",
cmd, pnum, file->f_dentry->d_inode->i_ino,
- (unsigned long)offset[pnum] >> PAGE_CACHE_SHIFT,
- (unsigned long long)offset[pnum],
- (unsigned long long)count[pnum]);
+ (unsigned long)pga[pnum].off >> PAGE_CACHE_SHIFT,
+ (unsigned long long)pga[pnum].off,
+ (unsigned long long)pga[pnum].count);
if (cmd & OBD_BRW_WRITE) {
loff_t off;
char *buffer;
- off = offset[pnum];
- buffer = kmap(pages[pnum]);
- retval = file->f_op->write(file, buffer, count[pnum],
+ off = pga[pnum].off;
+ buffer = kmap(pga[pnum].pg);
+ retval = file->f_op->write(file, buffer,
+ pga[pnum].count,
&off);
kunmap(pages[pnum]);
CDEBUG(D_INODE, "retval %ld\n", retval);
} else {
- loff_t off = offset[pnum];
- char *buffer = kmap(pages[pnum]);
+ loff_t off = pga[pnum].off;
+ char *buffer = kmap(pga[pnum].pg);
if (off >= file->f_dentry->d_inode->i_size) {
- memset(buffer, 0, count[pnum]);
- retval = count[pnum];
+ memset(buffer, 0, pga[pnum].count);
+ retval = pga[pnum].count;
} else {
retval = file->f_op->read(file, buffer,
- count[pnum], &off);
+ pga[pnum].count, &off);
}
kunmap(pages[pnum]);
- if (retval != count[pnum]) {
+ if (retval != pga[pnum].count) {
filp_close(file, 0);
GOTO(out, retval = -EIO);
}
* and arrays to handle the request parameters.
*/
while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
- obd_size brw_count = PAGE_SIZE;
- obd_off brw_offset = (page->index) << PAGE_SHIFT;
- obd_flag flagr = 0;
- obd_flag flagw = OBD_BRW_CREATE;
+ struct brw_page pg;
+ struct io_cb_data *cbd = ll_init_cb();
+
+ if (!cbd) {
+ err = -ENOMEM;
+ EXIT;
+ break;
+ }
+
+ pg.pg = page;
+ pg.count = PAGE_SIZE;
+ pg.off = (page->index) << PAGE_SHIFT;
+ pg.flag = 0;
page->index = index;
- err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &page,
- &brw_count, &brw_offset, &flagr, NULL, NULL);
+ err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg,
+ ll_sync_io_cb, cbd);
if ( err ) {
EXIT;
break;
}
+
+ cbd = ll_init_cb();
+ if (!cbd) {
+ err = -ENOMEM;
+ EXIT;
+ break;
+ }
+ pg.flag = OBD_BRW_CREATE;
CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
- err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &page,
- &brw_count, &brw_offset, &flagw, NULL, NULL);
+ err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg,
+ ll_sync_io_cb, cbd);
/* XXX should handle dst->o_size, dst->o_blocks here */
if ( err ) {
static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
{
struct osc_brw_cb_data *cb_data = data;
+ int err = 0;
ENTRY;
- if (desc->b_flags & PTL_RPC_FL_INTR)
+ if (desc->b_flags & PTL_RPC_FL_INTR) {
+ err = -ERESTARTSYS;
CERROR("got signal\n");
+ }
if (cb_data->callback)
- cb_data->callback(cb_data->cb_data);
+ cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
OBD_FREE(cb_data->obd_data, cb_data->obd_size);
OBD_FREE(cb_data, sizeof(*cb_data));
EXIT;
}
-static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
- obd_count page_count, struct page **page_array,
- obd_size *count, obd_off *offset, obd_flag *flags,
- brw_callback_t callback, void *data)
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, brw_callback_t callback, void *data)
{
struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
struct ptlrpc_request *request = NULL;
bulk->b_xid = xid; /* single xid for all pages */
- bulk->b_buf = kmap(page_array[mapped]);
- bulk->b_page = page_array[mapped];
+ bulk->b_buf = kmap(pga[mapped].pg);
+ bulk->b_page = pga[mapped].pg;
bulk->b_buflen = PAGE_SIZE;
- ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
- flags[mapped], bulk->b_xid);
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, bulk->b_xid);
}
/*
*
* On error, we never do the brw_finish, so we handle all decrefs.
*/
- if (!callback)
- ptlrpc_bulk_addref(desc);
rc = ptlrpc_register_bulk(desc);
if (rc)
- GOTO(out_desc2, rc);
+ GOTO(out_unmap, rc);
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
* restart them" and osc_brw callers can know this.
*/
if (rc)
- GOTO(out_desc2, rc);
+ GOTO(out_unmap, rc);
/* Callbacks cause asynchronous handling. */
- if (callback)
- GOTO(out_req, rc = 0);
-
- /* If there's no callback function, sleep here until complete. */
- l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
- if (desc->b_flags & PTL_RPC_FL_INTR)
- GOTO(out_desc, rc = -EINTR);
+ rc = callback(data, 0, CB_PHASE_START);
EXIT;
out_desc:
RETURN(rc);
/* Clean up on error. */
-out_desc2:
- if (!callback)
- ptlrpc_bulk_decref(desc);
out_unmap:
while (mapped-- > 0)
kunmap(page_array[mapped]);
static int osc_brw_write(struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
- struct page **pagearray, obd_size *count,
- obd_off *offset, obd_flag *flags,
+ struct brw_page *pga,
brw_callback_t callback, void *data)
{
struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
cb_data->obd_size = page_count * sizeof(*local);
for (mapped = 0; mapped < page_count; mapped++) {
- local[mapped].addr = kmap(pagearray[mapped]);
- local[mapped].offset = offset[mapped];
- local[mapped].len = count[mapped];
- ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
- flags[mapped], 0);
+ local[mapped].addr = kmap(pga[mapped].pg);
+ local[mapped].offset = pga[mapped].off;
+ local[mapped].len = pga[mapped].count;
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, 0);
}
size[1] = page_count * sizeof(*remote);
bulk->b_buf = (void *)(unsigned long)local[j].addr;
bulk->b_buflen = local[j].len;
bulk->b_xid = remote->xid;
- bulk->b_page = pagearray[j];
+ bulk->b_page = pga[j].pg;
}
if (desc->b_page_count != page_count)
/*
* One reference is released when brw_finish is complete, the
* other here when we finish waiting on it if we don't have a callback.
- *
- * We don't reference the bulk descriptor again here if there is a
- * callback, so we don't need an additional refcount on it.
*/
- if (!callback)
- ptlrpc_bulk_addref(desc);
rc = ptlrpc_send_bulk(desc);
/* XXX: Mike, same question as in osc_brw_read. */
GOTO(out_desc2, rc);
/* Callbacks cause asynchronous handling. */
- if (callback)
- GOTO(out_req, rc = 0);
-
- /* If there's no callback function, sleep here until complete. */
- l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
- if (desc->b_flags & PTL_RPC_FL_INTR)
- GOTO(out_desc, rc = -EINTR);
+ rc = callback(data, 0, CB_PHASE_START);
EXIT;
out_desc:
static int osc_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
- struct page **page_array, obd_size *count, obd_off *offset,
- obd_flag *flags, brw_callback_t callback, void *data)
+ struct brw_page *pagear, brw_callback_t callback,
+ void *data)
{
if (cmd & OBD_BRW_WRITE)
- return osc_brw_write(conn, md, page_count, page_array, count,
- offset, flags, callback, data);
+ return osc_brw_write(conn, md, page_count, pagear, callback, data);
else
- return osc_brw_read(conn, md, page_count, page_array, count,
- offset, flags, callback, data);
+ return osc_brw_read(conn, md, page_count, pagear, callback, data);
}
-static int osc_enqueue(struct lustre_handle *connh,
- struct lustre_handle *parent_lock, __u64 *res_id,
+static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
+ struct lustre_handle *parent_lock,
__u32 type, void *extentp, int extent_len, __u32 mode,
int *flags, void *callback, void *data, int datalen,
struct lustre_handle *lockh)
{
+ __u64 res_id = { md->lmd_object_id };
struct obd_device *obddev = class_conn2obd(connh);
struct ldlm_extent *extent = extentp;
int rc;
/* Next, search for already existing extent locks that will cover us */
//osc_con2dlmcl(conn, &cl, &connection, &rconn);
- rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
+ rc = ldlm_lock_match(obddev->obd_namespace, &res_id, type, extent,
sizeof(extent), mode, lockh);
if (rc == 1) {
/* We already have a lock, and it's referenced */
else
mode2 = LCK_PW;
- rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
+ rc = ldlm_lock_match(obddev->obd_namespace, &res_id, type, extent,
sizeof(extent), mode2, lockh);
if (rc == 1) {
int flags;
}
rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
- parent_lock, res_id, type, extent, sizeof(extent),
+ parent_lock, &res_id, type, extent, sizeof(extent),
mode, flags, ldlm_completion_ast, callback, data, datalen, lockh);
return rc;
}
-static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
+static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md, __u32 mode,
struct lustre_handle *lockh)
{
ENTRY;
req->rq_replen, req->rq_repmsg->status);
spin_lock(&cli->cli_lock);
- cli->cli_last_rcvd = req->rq_repmsg->last_rcvd;
+ cli->cli_last_xid = req->rq_repmsg->last_xid;
cli->cli_last_committed = req->rq_repmsg->last_committed;
ptlrpc_free_committed(cli);
spin_unlock(&cli->cli_lock);
m->status = NTOH__u32(m->status);
m->type = NTOH__u32(m->type);
m->bufcount = NTOH__u32(m->bufcount);
- m->last_rcvd = NTOH__u64(m->last_rcvd);
+ m->last_xid = NTOH__u64(m->last_xid);
m->last_committed = NTOH__u64(m->last_committed);
required_len = size_round(sizeof(*m) + m->bufcount * sizeof(__u32));
{"detach", jt_obd_detach, 0, "un-name a device\n"
"usage: detach"},
{"lovconfig", jt_obd_lov_config, 0,
- "write lov configuration to a mds device\n"
- "usage: lovconfig lov-uuid stripcount stripsize pattern UUID1 [UUID2 ...]"},
+ "write lov configuration to an mds device\n"
+ "usage: lovconfig lov-uuid stripe-count stripe-size offset pattern UUID1 [UUID2 ...]"},
/* Device operations */
{"=== device operations ==", jt_noop, 0, "device operations"},