From 8b92a57d48878188dc4fe7ea1438a403f7b486a7 Mon Sep 17 00:00:00 2001 From: braam Date: Sun, 11 Aug 2002 08:15:46 +0000 Subject: [PATCH] - change I/O to use a pagearray - implement remaining striping function in LOV: - read/write - locking - truncate - minor protocol cleanup for MDS - change documentation to include design / architecture / manual / appendix parts - add design documents: - managmennt api - network format - --- lustre/include/linux/lustre_idl.h | 17 +- lustre/include/linux/lustre_lib.h | 14 +- lustre/include/linux/lustre_mds.h | 3 +- lustre/include/linux/lustre_net.h | 2 +- lustre/include/linux/obd.h | 28 ++-- lustre/include/linux/obd_class.h | 17 +- lustre/lib/l_net.c | 3 +- lustre/lib/mds_updates.c | 9 +- lustre/lib/page.c | 39 +++++ lustre/llite/file.c | 39 +++-- lustre/llite/recover.c | 23 ++- lustre/llite/rw.c | 63 +++++--- lustre/llite/super.c | 4 +- lustre/lov/lov_obd.c | 323 +++++++++++++++++++++----------------- lustre/mdc/mdc_reint.c | 11 +- lustre/mdc/mdc_request.c | 18 +-- lustre/mds/handler.c | 4 +- lustre/obdclass/Makefile.am | 5 +- lustre/obdclass/class_obd.c | 37 ++--- lustre/obdfilter/filter.c | 61 ++++--- lustre/osc/osc_request.c | 88 ++++------- lustre/ptlrpc/client.c | 2 +- lustre/ptlrpc/pack_generic.c | 2 +- lustre/utils/lctl.c | 4 +- 24 files changed, 447 insertions(+), 369 deletions(-) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 5fa1c0b..03fa6da 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -117,7 +117,7 @@ struct lustre_msg { __u32 type; __u32 version; __u32 opc; - __u64 last_rcvd; + __u64 last_xid; __u64 last_committed; __u64 transno; __u32 status; @@ -321,6 +321,11 @@ struct mds_status_req { __u32 repbuf; }; +struct mds_fileh_body { + struct ll_fid f_fid; + struct lustre_handle f_handle; +}; + struct mds_conn_status { struct ll_fid rootfid; __u64 xid; @@ -347,13 +352,15 @@ struct mds_body { __u32 ino; __u32 nlink; __u32 generation; - __u32 last_xid; + __u32 last_xidnomore; }; /* MDS update records */ -struct mds_update_record_hdr { - __u32 ur_opcode; -}; + + +//struct mds_update_record_hdr { +// __u32 ur_opcode; +//}; struct mds_rec_setattr { __u32 sa_opcode; diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 4d053cd..4e0f03f 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -60,6 +60,16 @@ void l_unlock(struct lustre_lock *); /* page.c */ +#define CB_PHASE_START 12 +#define CB_PHASE_FINISH 13 +struct io_cb_data { + wait_queue_head_t waitq; + atomic_t refcount; + int complete; + int err; +}; +int ll_sync_io_cb(void *data, int err, int phase); +struct io_cb_data *ll_init_cb(void); inline void lustre_put_page(struct page *page); struct page *lustre_get_page_read(struct inode *dir, unsigned long index); struct page *lustre_get_page_write(struct inode *dir, unsigned long index); @@ -408,8 +418,8 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg) sigismember(&(task->pending.signal), SIGTERM)) /* - * Like wait_event_interruptible, but we're only interruptible by KILL, INT, or - * TERM. + * Like wait_event_interruptible, but we're only interruptible by + * KILL, INT, or TERM. * * XXXshaver These are going away soon, I hope. */ diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 57da73e..5bca561 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -148,8 +148,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh, uuid_t **uuids, struct ptlrpc_request **request); int mdc_getstatus(struct lustre_handle *conn, - struct ll_fid *rootfid, __u64 *last_committed, __u64 *last_rcvd, - __u32 *last_xid, struct ptlrpc_request **); + struct ll_fid *rootfid, __u64 *last_committed, __u32 *last_xid, struct ptlrpc_request **); int mdc_getattr(struct lustre_handle *conn, obd_id ino, int type, unsigned long valid, size_t ea_size, struct ptlrpc_request **request); diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index fffe5a4..0c50c99 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -58,7 +58,7 @@ struct ptlrpc_client { __u32 cli_request_portal; __u32 cli_reply_portal; - __u64 cli_last_rcvd; + __u64 cli_last_xid; __u64 cli_last_committed; __u32 cli_target_devno; diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index e94f8b8..6156266 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -23,6 +23,13 @@ struct obd_type { int typ_refcnt; }; +typedef int (*brw_callback_t)(void *, int err, int phase); +struct brw_page { + struct page *pg; + obd_size count; + obd_off off; + obd_flag flag; +}; /* Individual type definitions */ @@ -69,15 +76,6 @@ struct client_obd { int cl_max_mdsize; }; -#if 0 -struct osc_obd { - struct ptlrpc_client *osc_client; - struct ptlrpc_client *osc_ldlm_client; - struct ptlrpc_connection *osc_conn; - __u8 osc_target_uuid[37]; -}; -#endif - struct mds_obd { struct ptlrpc_service *mds_service; @@ -208,7 +206,6 @@ struct obd_device { } u; }; -typedef void (*brw_callback_t)(void *); struct obd_ops { int (*o_iocontrol)(long cmd, struct lustre_handle *, int len, @@ -243,8 +240,8 @@ struct obd_ops { struct lov_stripe_md *); int (*o_brw)(int rw, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count oa_bufs, - struct page **buf, obd_size *count, obd_off *offset, - obd_flag *flags, brw_callback_t callback, void * data); + struct brw_page *pgarr, brw_callback_t callback, + void * data); int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt, struct lov_stripe_md *md, obd_size count, obd_off offset); @@ -266,13 +263,12 @@ struct obd_ops { int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *local, void *desc_private); - int (*o_enqueue)(struct lustre_handle *conn, - struct lustre_handle *parent_lock, __u64 *res_id, + int (*o_enqueue)(struct lustre_handle *conn, struct lov_stripe_md *md, + struct lustre_handle *parent_lock, __u32 type, void *cookie, int cookielen, __u32 mode, int *flags, void *cb, void *data, int datalen, struct lustre_handle *lockh); - int (*o_cancel)(struct lustre_handle *, __u32 mode, - struct lustre_handle *); + int (*o_cancel)(struct lustre_handle *, struct lov_stripe_md *md, __u32 mode, struct lustre_handle *); }; #endif diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 2298cd6..bcd8c3b 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -342,10 +342,7 @@ static inline int obd_punch(struct lustre_handle *conn, struct obdo *tgt, static inline int obd_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count oa_bufs, - struct page **buf, - obd_size *count, - obd_off *offset, - obd_flag *flags, + struct brw_page *pg, brw_callback_t callback, void *data) { int rc; @@ -358,8 +355,7 @@ static inline int obd_brw(int cmd, struct lustre_handle *conn, LBUG(); } - rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, buf, - count, offset, flags, callback, data); + rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, pg, callback, data); RETURN(rc); } @@ -406,7 +402,8 @@ static inline int obd_iocontrol(int cmd, struct lustre_handle *conn, } static inline int obd_enqueue(struct lustre_handle *conn, - struct lustre_handle *parent_lock, __u64 *res_id, + struct lov_stripe_md *md, + struct lustre_handle *parent_lock, __u32 type, void *cookie, int cookielen, __u32 mode, int *flags, void *cb, void *data, int datalen, struct lustre_handle *lockh) @@ -416,13 +413,13 @@ static inline int obd_enqueue(struct lustre_handle *conn, OBD_CHECK_SETUP(conn, export); OBD_CHECK_OP(export->exp_obd,enqueue); - rc = OBP(export->exp_obd, enqueue)(conn, parent_lock, res_id, type, + rc = OBP(export->exp_obd, enqueue)(conn, md, parent_lock, type, cookie, cookielen, mode, flags, cb, data, datalen, lockh); RETURN(rc); } -static inline int obd_cancel(struct lustre_handle *conn, __u32 mode, +static inline int obd_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, __u32 mode, struct lustre_handle *lockh) { int rc; @@ -430,7 +427,7 @@ static inline int obd_cancel(struct lustre_handle *conn, __u32 mode, OBD_CHECK_SETUP(conn, export); OBD_CHECK_OP(export->exp_obd,cancel); - rc = OBP(export->exp_obd, cancel)(conn, mode, lockh); + rc = OBP(export->exp_obd, cancel)(conn, md, mode, lockh); RETURN(rc); } diff --git a/lustre/lib/l_net.c b/lustre/lib/l_net.c index 1ef0ad9..a1a687c 100644 --- a/lustre/lib/l_net.c +++ b/lustre/lib/l_net.c @@ -274,7 +274,8 @@ int target_handle_connect(struct ptlrpc_request *req) conn.addr = req->rq_reqmsg->addr; conn.cookie = req->rq_reqmsg->cookie; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + rc = lustre_pack_msg(0, + NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) GOTO(out, rc); diff --git a/lustre/lib/mds_updates.c b/lustre/lib/mds_updates.c index 2fb2d3e..3cf6dbf 100644 --- a/lustre/lib/mds_updates.c +++ b/lustre/lib/mds_updates.c @@ -94,7 +94,6 @@ static void mds_pack_body(struct mds_body *b) b->ino = HTON__u32(b->ino); b->nlink = HTON__u32(b->nlink); b->generation = HTON__u32(b->generation); - b->last_xid = HTON__u32(b->last_xid); } void mds_getattr_pack(struct ptlrpc_request *req, int offset, @@ -268,7 +267,6 @@ void mds_unpack_body(struct mds_body *b) b->ino = NTOH__u32(b->ino); b->nlink = NTOH__u32(b->nlink); b->generation = NTOH__u32(b->generation); - b->last_xid = NTOH__u32(b->last_xid); } static int mds_setattr_unpack(struct ptlrpc_request *req, int offset, @@ -399,15 +397,14 @@ static update_unpacker mds_unpackers[REINT_MAX + 1] = { int mds_update_unpack(struct ptlrpc_request *req, int offset, struct mds_update_record *rec) { - struct mds_update_record_hdr *hdr = - lustre_msg_buf(req->rq_reqmsg, offset); + __u32 *opcode = lustre_msg_buf(req->rq_reqmsg, offset); int rc; ENTRY; - if (!hdr || req->rq_reqmsg->buflens[offset] < sizeof(*hdr)) + if (!opcode || req->rq_reqmsg->buflens[offset] < sizeof(*opcode)) RETURN(-EFAULT); - rec->ur_opcode = NTOH__u32(hdr->ur_opcode); + rec->ur_opcode = NTOH__u32(*opcode); if (rec->ur_opcode < 0 || rec->ur_opcode > REINT_MAX) RETURN(-EFAULT); diff --git a/lustre/lib/page.c b/lustre/lib/page.c index a949a3b..51bb5f5 100644 --- a/lustre/lib/page.c +++ b/lustre/lib/page.c @@ -49,6 +49,45 @@ #include #include + +int ll_sync_io_cb(void *data, int err, int phase) +{ + struct io_cb_data *d = data; + int ret; + ENTRY; + + if (phase == CB_PHASE_START) { + ret = l_wait_event_killable(d->waitq, d->complete); + if (atomic_dec_and_test(&d->refcount)) + OBD_FREE(d, sizeof(*d)); + if (ret == -ERESTARTSYS) + return ret; + } else if (phase == CB_PHASE_FINISH) { + d->err = err; + d->complete = 1; + wake_up(&d->waitq); + if (atomic_dec_and_test(&d->refcount)) + OBD_FREE(d, sizeof(*d)); + return err; + } else + LBUG(); + EXIT; + return 0; +} + +struct io_cb_data *ll_init_cb(void) +{ + struct io_cb_data *d; + + + OBD_ALLOC(d, sizeof(*d)); + if (d) { + init_waitqueue_head(&d->waitq); + atomic_set(&d->refcount, 2); + } + RETURN(d); +} + /* * Remove page from dirty list */ diff --git a/lustre/llite/file.c b/lustre/llite/file.c index a26825f..0bbad70 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -256,28 +256,34 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, struct inode *inode = filp->f_dentry->d_inode; struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_extent extent; - struct lustre_handle lockh; - __u64 res_id[RES_NAME_SIZE] = {inode->i_ino}; + struct lustre_handle *lockhs = NULL; + struct lov_stripe_md *md = ll_i2info(inode)->lli_smd; int flags = 0; ldlm_error_t err; ssize_t retval; ENTRY; + + if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) { + OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + if (!lockhs) + RETURN(-ENOMEM); + extent.start = *ppos; extent.end = *ppos + count; CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n", inode->i_ino, extent.start, extent.end); - err = obd_enqueue(&sbi->ll_osc_conn, NULL, res_id, LDLM_EXTENT, + err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT, &extent, sizeof(extent), LCK_PR, &flags, ll_lock_callback, inode, sizeof(*inode), - &lockh); + lockhs); if (err != ELDLM_OK) { + OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); CERROR("lock enqueue: err: %d\n", err); RETURN(err); } - ldlm_lock_dump((void *)(unsigned long)lockh.addr); } CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n", @@ -288,13 +294,16 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, ll_update_atime(inode); if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) { - err = obd_cancel(&sbi->ll_osc_conn, LCK_PR, &lockh); + err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PR, lockhs); if (err != ELDLM_OK) { + OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); CERROR("lock cancel: err: %d\n", err); RETURN(err); } } + if (lockhs) + OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); RETURN(retval); } @@ -308,14 +317,17 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) struct inode *inode = file->f_dentry->d_inode; struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_extent extent; - struct lustre_handle lockh; - __u64 res_id[RES_NAME_SIZE] = {inode->i_ino}; + struct lustre_handle *lockhs = NULL; + struct lov_stripe_md *md = ll_i2info(inode)->lli_smd; int flags = 0; ldlm_error_t err; ssize_t retval; ENTRY; if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) { + OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + if (!lockhs) + RETURN(-ENOMEM); /* FIXME: this should check whether O_APPEND is set and adjust * extent.start accordingly */ extent.start = *ppos; @@ -323,15 +335,15 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n", inode->i_ino, extent.start, extent.end); - err = obd_enqueue(&sbi->ll_osc_conn, NULL, res_id, LDLM_EXTENT, + err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT, &extent, sizeof(extent), LCK_PW, &flags, ll_lock_callback, inode, sizeof(*inode), - &lockh); + lockhs); if (err != ELDLM_OK) { + OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); CERROR("lock enqueue: err: %d\n", err); RETURN(err); } - ldlm_lock_dump((void *)(unsigned long)lockh.addr); } CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n", @@ -340,13 +352,16 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) retval = generic_file_write(file, buf, count, ppos); if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) { - err = obd_cancel(&sbi->ll_osc_conn, LCK_PW, &lockh); + err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PW, lockhs); if (err != ELDLM_OK) { + OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); CERROR("lock cancel: err: %d\n", err); RETURN(err); } } + if (lockhs) + OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); RETURN(retval); } diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c index 122ff36..f2e4719 100644 --- a/lustre/llite/recover.c +++ b/lustre/llite/recover.c @@ -26,7 +26,7 @@ static int ll_reconnect(struct ll_sb_info *sbi) { struct ll_fid rootfid; - __u64 last_committed, last_rcvd; + __u64 last_committed; __u32 last_xid; int err; struct ptlrpc_request *request; @@ -44,14 +44,13 @@ static int ll_reconnect(struct ll_sb_info *sbi) /* XXX: need to store the last_* values somewhere */ err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed, - &last_rcvd, &last_xid, &request); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); GOTO(out_disc, err = -ENOTCONN); } - sbi2mdc(sbi)->cl_client->cli_last_rcvd = last_xid; + sbi2mdc(sbi)->cl_client->cli_last_xid = last_xid; sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD; out_disc: @@ -78,7 +77,7 @@ int ll_recover(struct ptlrpc_client *cli) /* replay what needs to be replayed */ if (req->rq_flags & PTL_RPC_FL_REPLAY) { CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n", - req->rq_xid, cli->cli_last_rcvd); + req->rq_xid, cli->cli_last_xid); rc = ptlrpc_replay_req(req); if (rc) { CERROR("recovery replay error %d for request %Ld\n", @@ -89,17 +88,17 @@ int ll_recover(struct ptlrpc_client *cli) /* server has seen req, we have reply: skip */ if ((req->rq_flags & PTL_RPC_FL_REPLIED) && - req->rq_xid <= cli->cli_last_rcvd) { + req->rq_xid <= cli->cli_last_xid) { CDEBUG(D_INODE, "req %Ld was complete: skip [last rcvd %Ld]\n", - req->rq_xid, cli->cli_last_rcvd); + req->rq_xid, cli->cli_last_xid); continue; } /* server has lost req, we have reply: resend, ign reply */ if ((req->rq_flags & PTL_RPC_FL_REPLIED) && - req->rq_xid > cli->cli_last_rcvd) { + req->rq_xid > cli->cli_last_xid) { CDEBUG(D_INODE, "lost req %Ld have rep: replay [last rcvd %Ld]\n", - req->rq_xid, cli->cli_last_rcvd); + req->rq_xid, cli->cli_last_xid); rc = ptlrpc_replay_req(req); if (rc) { CERROR("request resend error %d for request %Ld\n", @@ -110,17 +109,17 @@ int ll_recover(struct ptlrpc_client *cli) /* server has seen req, we have lost reply: -ERESTARTSYS */ if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) && - req->rq_xid <= cli->cli_last_rcvd) { + req->rq_xid <= cli->cli_last_xid) { CDEBUG(D_INODE, "lost rep %Ld srv did req: restart [last rcvd %Ld]\n", - req->rq_xid, cli->cli_last_rcvd); + req->rq_xid, cli->cli_last_xid); ptlrpc_restart_req(req); } /* service has not seen req, no reply: resend */ if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) && - req->rq_xid > cli->cli_last_rcvd) { + req->rq_xid > cli->cli_last_xid) { CDEBUG(D_INODE, "lost rep/req %Ld: resend [last rcvd %Ld]\n", - req->rq_xid, cli->cli_last_rcvd); + req->rq_xid, cli->cli_last_xid); ptlrpc_resend_req(req); } diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 0200ca6..8f3ce22 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -33,19 +33,25 @@ #include #include + /* SYNCHRONOUS I/O to object storage for an inode */ static int ll_brw(int rw, struct inode *inode, struct page *page, int create) { struct ll_inode_info *lii = ll_i2info(inode); struct lov_stripe_md *md = lii->lli_smd; - obd_size count = PAGE_SIZE; - obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; - obd_flag flags = create ? OBD_BRW_CREATE : 0; + struct brw_page pg; int err; + struct io_cb_data *cbd = ll_init_cb(); ENTRY; + if (!cbd) + RETURN(-ENOMEM); + + pg.pg = page; + pg.count = PAGE_SIZE; + pg.off = ((obd_off)page->index) << PAGE_SHIFT; + pg.flag = create ? OBD_BRW_CREATE : 0; - err = obd_brw(rw, ll_i2obdconn(inode), md, 1, - &page, &count, &offset, &flags, NULL, NULL); + err = obd_brw(rw, ll_i2obdconn(inode), md, 1, &pg, ll_sync_io_cb, cbd); RETURN(err); } /* ll_brw */ @@ -139,6 +145,7 @@ static int ll_writepage(struct page *page) RETURN(err); } + /* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated * too */ static int ll_commit_write(struct file *file, struct page *page, @@ -148,13 +155,19 @@ static int ll_commit_write(struct file *file, struct page *page, struct inode *inode = page->mapping->host; struct ll_inode_info *lii = ll_i2info(inode); struct lov_stripe_md *md = lii->lli_smd; - obd_size count = to; - obd_off offset = (((obd_off)page->index) << PAGE_SHIFT); - obd_flag flags = create ? OBD_BRW_CREATE : 0; + struct brw_page pg; int err; struct iattr iattr; + struct io_cb_data *cbd = ll_init_cb(); + + pg.pg = page; + pg.count = to; + pg.off = (((obd_off)page->index) << PAGE_SHIFT); + pg.flag = create ? OBD_BRW_CREATE : 0; ENTRY; + if (!cbd) + RETURN(-ENOMEM); SetPageUptodate(page); @@ -162,13 +175,13 @@ static int ll_commit_write(struct file *file, struct page *page, LBUG(); CDEBUG(D_INODE, "commit_page writing (at %d) to %d, count %Ld\n", - from, to, (unsigned long long)count); + from, to, (unsigned long long)pg.count); err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, - 1, &page, &count, &offset, &flags, NULL, NULL); + 1, &pg, ll_sync_io_cb, cbd); kunmap(page); - iattr.ia_size = offset + to; + iattr.ia_size = pg.off + pg.count; if (iattr.ia_size > inode->i_size) { /* do NOT truncate when writing in the middle of a file */ inode->i_size = iattr.ia_size; @@ -226,47 +239,45 @@ int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, obd_count bufs_per_obdo = iobuf->nr_pages; struct ll_inode_info *lii = ll_i2info(inode); struct lov_stripe_md *md = lii->lli_smd; - obd_size *count = NULL; - obd_off *offset = NULL; - obd_flag *flags = NULL; + struct brw_page *pga; int rc = 0; int i; + struct io_cb_data *cbd = ll_init_cb(); ENTRY; + if (!cbd) + RETURN(-ENOMEM); if (blocksize != PAGE_SIZE) { CERROR("direct_IO blocksize != PAGE_SIZE\n"); return -EINVAL; } - OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo); - OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo); - OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo); - if (!count || !offset || !flags) + OBD_ALLOC(pga, sizeof(*pga) * bufs_per_obdo); + if (pga) GOTO(out, rc = -ENOMEM); /* NB: we can't use iobuf->maplist[i]->index for the offset * instead of "blocknr" because ->index contains garbage. */ for (i = 0; i < bufs_per_obdo; i++, blocknr++) { - count[i] = PAGE_SIZE; - offset[i] = (obd_off)blocknr << PAGE_SHIFT; - flags[i] = OBD_BRW_CREATE; + pga[i].pg = iobuf->maplist[i]; + pga[i].count = PAGE_SIZE; + pga[i].off = (obd_off)blocknr << PAGE_SHIFT; + pga[i].flag = OBD_BRW_CREATE; } if (!md || !md->lmd_object_id) GOTO(out, rc = -ENOMEM); rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, - ll_i2obdconn(inode), md, bufs_per_obdo, - iobuf->maplist, count, offset, flags, NULL, NULL); + ll_i2obdconn(inode), md, bufs_per_obdo, pga, + ll_sync_io_cb, cbd); if (rc == 0) rc = bufs_per_obdo * PAGE_SIZE; out: - OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo); - OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo); - OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo); + OBD_FREE(pga, sizeof(*pga) * bufs_per_obdo); RETURN(rc); } diff --git a/lustre/llite/super.c b/lustre/llite/super.c index d6ab7d1..665ca1d 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -86,7 +86,7 @@ static struct super_block * ll_read_super(struct super_block *sb, int err; struct ll_fid rootfid; struct statfs sfs; - __u64 last_committed, last_rcvd; + __u64 last_committed; __u32 last_xid; struct ptlrpc_request *request = NULL; struct ll_inode_md md; @@ -150,7 +150,7 @@ static struct super_block * ll_read_super(struct super_block *sb, /* XXX: need to store the last_* values somewhere */ err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed, - &last_rcvd, &last_xid, &request); + &last_xid, &request); ptlrpc_req_finished(request); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index cb05eb3..d00296e 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -220,6 +220,11 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st md->lmd_stripe_count = lov->desc.ld_default_stripe_count; } + if (!md->lmd_stripe_size) + md->lmd_stripe_size = lov->desc.ld_default_stripe_size; + + + for (i = 0; i < md->lmd_stripe_count; i++) { struct lov_stripe_md obj_md; struct lov_stripe_md *obj_mdp = &obj_md; @@ -269,7 +274,7 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, for (i = 0; i < md->lmd_stripe_count; i++) { /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); - oa->o_id = md->lmd_objects[i].l_object_id; + tmp.o_id = md->lmd_objects[i].l_object_id; rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL); if (!rc) { CERROR("Error destroying object %Ld on %d\n", @@ -359,7 +364,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, ENTRY; if (!md) { - CERROR("LOV requires striping ea for desctruction\n"); + CERROR("LOV requires striping ea for opening\n"); RETURN(-EINVAL); } @@ -373,7 +378,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, oa->o_id = md->lmd_objects[i].l_object_id; rc = obd_open(&lov->tgts[i].conn, &tmp, NULL); - if (!rc) { + if (rc) { CERROR("Error getattr object %Ld on %d\n", oa->o_id, i); } @@ -414,6 +419,10 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, RETURN(rc); } +#ifndef log2 +#define log2(n) ffz(~(n)) +#endif + /* compute offset in stripe i corresponds to offset "in" */ __u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i) { @@ -436,33 +445,26 @@ __u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i) return (__u64) out; } - -struct lov_callback_data { - atomic_t count; - wait_queue_head_t waitq; -}; - -static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data) +/* compute offset in stripe i corresponds to offset "in" */ +__u64 lov_stripe(struct lov_stripe_md *md, __u64 in, int *j) { - struct lov_callback_data *cb_data = data; + __u32 ssz = md->lmd_stripe_size; + __u32 off, out; + /* full stripes across all * stripe size */ + *j = (((__u32) in)/ssz) % md->lmd_stripe_count; + off = (__u32)in % (md->lmd_stripe_count * ssz); + out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz + + (off - ((*j) * ssz)) % ssz;; - if (atomic_dec_and_test(&cb_data->count)) - wake_up(&cb_data->waitq); + return (__u64) out; } -static int lov_read_check_status(struct lov_callback_data *cb_data) +int lov_stripe_which(struct lov_stripe_md *md, __u64 in) { - ENTRY; - if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGTERM) || - sigismember(&(current->pending.signal), SIGINT)) { - // FIXME XXX what here - // cb_data->flags |= PTL_RPC_FL_INTR; - RETURN(1); - } - if (atomic_read(&cb_data->count) == 0) - RETURN(1); - RETURN(0); + __u32 ssz = md->lmd_stripe_size; + int j; + j = (((__u32) in)/ssz) % md->lmd_stripe_count; + return j; } @@ -492,6 +494,8 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, __u64 starti = lov_offset(md, start, i); __u64 endi = lov_offset(md, end, i); + if (starti == endi) + continue; /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); oa->o_id = md->lmd_objects[i].l_object_id; @@ -506,160 +510,189 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, RETURN(rc); } +struct lov_callback_data { + atomic_t count; + struct io_cb_data *cbd; + brw_callback_t cb; + int err; +}; + +int lov_osc_brw_callback(void *data, int err, int phase) +{ + struct lov_callback_data *d = data; + int ret = 0; + ENTRY; + + if (phase == CB_PHASE_START) { + RETURN(0); + } else if (phase == CB_PHASE_FINISH) { + if (err) + d->err = err; + if (atomic_dec_and_test(&d->count)) { + ret = d->cb(d->cbd, 0, d->err); + } + RETURN(ret); + } else + LBUG(); + EXIT; + return 0; +} -#if 0 -static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa, - struct obdo **oa, - obd_count *oa_bufs, struct page **buf, - obd_size *count, obd_off *offset, obd_flag *flags, - bulk_callback_t callback, void *data) +static inline int lov_brw(int cmd, struct lustre_handle *conn, + struct lov_stripe_md *md, + obd_count oa_bufs, + struct brw_page *pga, + brw_callback_t callback, void *data) { - int rc, i, page_array_offset = 0; - obd_off off = offset; - obd_size retval = 0; - struct lov_callback_data *cb_data; + int stripe_count = md->lmd_stripe_count; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; + struct { + int bufct; + int index; + int subcount; + struct lov_stripe_md md; + } *stripeinfo; + struct brw_page *ioarr; + int rc, i; + struct lov_callback_data *lov_cb_data; ENTRY; - if (num_oa != 1) - LBUG(); + lov = &export->exp_obd->u.lov; - if (!class_conn2export(conn)) - RETURN(-EINVAL); + OBD_ALLOC(lov_cb_data, sizeof(*lov_cb_data)); + if (!lov_cb_data) + RETURN(-ENOMEM); - OBD_ALLOC(cb_data, sizeof(*cb_data)); - if (cb_data == NULL) { - LBUG(); + OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo)); + if (!stripeinfo) + RETURN(-ENOMEM); + + OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs); + if (!ioarr) { + OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); RETURN(-ENOMEM); } - INIT_WAITQUEUE_HEAD(&cb_data->waitq); - atomic_set(&cb_data->count, 0); - for (i = 0; i < oa_bufs[0]; i++) { - struct page *current_page = buf[i]; + for (i=0 ; i < oa_bufs ; i++ ) { + int which; + which = lov_stripe_which(md, pga[i].pg->index * PAGE_SIZE); + stripeinfo[which].bufct++; + } - struct lov_md *md = (struct lov_md *)oa[i]->inline; - int bufcount = oa_bufs[i]; - // md->lmd_stripe_count + for (i=0 ; i < stripe_count ; i++) { + if (i>0) + stripeinfo[i].index = + stripeinfo[i-1].index + stripeinfo[i-1].bufct; + stripeinfo[i].md.lmd_object_id = + md->lmd_objects[i].l_object_id; + } - for (k = page_array_offset; k < bufcount + page_array_offset; - k++) { - - } - page_array_offset += bufcount; - - - while (off < offset + count) { - int stripe, conn; - obd_size size, tmp; - - stripe = off / conn->oc_dev->u.lov.lov_stripe_size; - size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off; - if (size > *count) - size = *count; - - conn = stripe % conn->oc_dev->obd_multi_count; - - tmp = size; - atomic_inc(&cb_data->count); - rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn], - num_oa, oa, buf, - &size, off, lov_read_callback, cb_data); - if (rc == 0) - retval += size; - else { - CERROR("read(off=%Lu, count=%Lu): %d\n", - (unsigned long long)off, - (unsigned long long)size, rc); - break; - } + for (i=0 ; i < oa_bufs ; i++ ) { + int which, shift; + which = lov_stripe_which(md, pga[i].pg->index * PAGE_SIZE); - buf += size; + shift = stripeinfo[which].index; + ioarr[shift + stripeinfo[which].subcount] = pga[i]; + pga[i].off = lov_offset(md, pga[i].pg->index * PAGE_SIZE, which); + stripeinfo[which].subcount++; } + + lov_cb_data->cb = callback; + lov_cb_data->cbd = data; + atomic_set(&lov_cb_data->count, oa_bufs); + for (i=0 ; i < stripe_count ; i++) { + int shift = stripeinfo[i].index; - wait_event(&cb_data->waitq, lov_read_check_status(cb_data)); - if (cb_data->flags & PTL_RPC_FL_INTR) - rc = -EINTR; + obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md, + stripeinfo[i].bufct, &ioarr[shift], + lov_osc_brw_callback, &lov_cb_data); + } - /* FIXME: The error handling here sucks */ - *count = retval; - OBD_FREE(cb_data, sizeof(*cb_data)); - RETURN(rc); -} + rc = callback(lov_cb_data, 0, CB_PHASE_START); -static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data) -{ - + RETURN(rc); } -/* buffer must lie in user memory here */ -static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf, - obd_size *count, obd_off offset) +static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md, + struct lustre_handle *parent_lock, + __u32 type, void *cookie, int cookielen, __u32 mode, + int *flags, void *cb, void *data, int datalen, + struct lustre_handle *lockhs) { - int err; - struct file *file; - unsigned long retval; - + int rc = 0, i; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; ENTRY; - if (!class_conn2export(conn)) { - CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; - } - file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode); - if (!file || IS_ERR(file)) { - EXIT; - return -PTR_ERR(file); + if (!md) { + CERROR("LOV requires striping ea for desctruction\n"); + RETURN(-EINVAL); } - /* count doubles as retval */ - retval = file->f_op->write(file, buf, *count, (loff_t *)&offset); - filp_close(file, 0); + if (!export || !export->exp_obd) + RETURN(-ENODEV); - if ( retval >= 0 ) { - err = 0; - *count = retval; - EXIT; - } else { - err = retval; - *count = 0; - EXIT; + lov = &export->exp_obd->u.lov; + for (i = 0; i < md->lmd_stripe_count; i++) { + struct ldlm_extent *extent = (struct ldlm_extent *)cookie; + struct ldlm_extent sub_ext; + struct lov_stripe_md submd; + + sub_ext.start = lov_offset(md, extent->start, i); + sub_ext.end = lov_offset(md, extent->end, i); + if ( sub_ext.start == sub_ext.end ) + continue; + + submd.lmd_object_id = md->lmd_objects[i].l_object_id; + submd.lmd_easize = sizeof(submd); + rc = obd_enqueue(&(lov->tgts[i].conn), &submd, parent_lock, type, + &sub_ext, sizeof(sub_ext), mode, flags, cb, data, datalen, &(lockhs[i])); + // XXX add a lock debug statement here + if (!rc) { + CERROR("Error punch object %Ld subobj %Ld\n", md->lmd_object_id, + md->lmd_objects[i].l_object_id); + } } - - return err; + RETURN(rc); } -static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns, - struct ldlm_handle *parent_lock, __u64 *res_id, - __u32 type, struct ldlm_extent *extent, __u32 mode, - int *flags, void *data, int datalen, - struct ldlm_handle *lockh) +static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, __u32 mode, + struct lustre_handle *lockhs) { - int rc; + int rc = 0, i; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; ENTRY; - if (!class_conn2export(conn)) - RETURN(-EINVAL); + if (!md) { + CERROR("LOV requires striping ea for lock cancellation\n"); + RETURN(-EINVAL); + } - rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock, - res_id, type, extent, mode, flags, data, datalen, - lockh); - RETURN(rc); -} + if (!export || !export->exp_obd) + RETURN(-ENODEV); -static int lov_cancel(struct lustre_handle *conn, __u32 mode, - struct ldlm_handle *lockh) -{ - int rc; - ENTRY; + lov = &export->exp_obd->u.lov; + for (i = 0; i < md->lmd_stripe_count; i++) { + struct lov_stripe_md submd; - if (!class_conn2export(conn)) - RETURN(-EINVAL); + if ( lockhs[i].addr == 0 ) + continue; - rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa); + submd.lmd_object_id = md->lmd_objects[i].l_object_id; + submd.lmd_easize = sizeof(submd); + rc = obd_cancel(&lov->tgts[i].conn, &submd, mode, &lockhs[i]); + if (!rc) { + CERROR("Error punch object %Ld subobj %Ld\n", md->lmd_object_id, + md->lmd_objects[i].l_object_id); + } + } RETURN(rc); } -#endif + + + struct obd_ops lov_obd_ops = { o_setup: lov_setup, @@ -671,12 +704,10 @@ struct obd_ops lov_obd_ops = { o_setattr: lov_setattr, o_open: lov_open, o_close: lov_close, -#if 0 - o_brw: lov_pgcache_brw, + o_brw: lov_brw, o_punch: lov_punch, o_enqueue: lov_enqueue, o_cancel: lov_cancel -#endif }; diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 81cb40f..68ba0b6 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -49,15 +49,11 @@ int mdc_setattr(struct lustre_handle *conn, struct inode *inode, struct iattr *iattr, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; - struct mds_rec_setattr *rec; struct ptlrpc_request *req; + struct mds_rec_setattr *rec; int rc, size = sizeof(*rec); ENTRY; - mdc_con2cl(conn, &cl, &connection, &rconn); req = ptlrpc_prep_req2(conn, MDS_REINT, 1, &size, NULL); if (!req) RETURN(-ENOMEM); @@ -126,12 +122,11 @@ int mdc_create(struct lustre_handle *conn, resend: rc = mdc_reint(req, level); if (rc == -ERESTARTSYS) { - struct mds_update_record_hdr *hdr = - lustre_msg_buf(req->rq_reqmsg, 0); + __u32 *opcode = lustre_msg_buf(req->rq_reqmsg, 0); level = LUSTRE_CONN_RECOVD; CERROR("Lost reply: re-create rep.\n"); req->rq_flags = 0; - hdr->ur_opcode = NTOH__u32(REINT_RECREATE); + *opcode = NTOH__u32(REINT_RECREATE); goto resend; } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index bf725c2..6d9fdaa 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -56,8 +56,8 @@ int mdc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl, } int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid, - __u64 *last_committed, __u64 *last_rcvd, - __u32 *last_xid, struct ptlrpc_request **request) + __u64 *last_committed, __u32 *last_xid, + struct ptlrpc_request **request) { struct ptlrpc_request *req; struct mds_body *body; @@ -81,15 +81,11 @@ int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid, mds_unpack_body(body); memcpy(rootfid, &body->fid1, sizeof(*rootfid)); *last_committed = req->rq_repmsg->last_committed; - *last_rcvd = req->rq_repmsg->last_rcvd; - *last_xid = body->last_xid; + *last_xid = req->rq_repmsg->last_xid; - CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_rcvd=%Lu," - " last_xid=%d\n", + CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_xid=%d\n", (unsigned long)rootfid->id, - (unsigned long long)*last_committed, - (unsigned long long)*last_rcvd, - body->last_xid); + (unsigned long long)*last_committed, last_xid); } EXIT; @@ -145,15 +141,11 @@ int mdc_getattr(struct lustre_handle *conn, obd_id ino, int type, unsigned long valid, size_t ea_size, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ptlrpc_request *req; struct mds_body *body; int rc, size[2] = {sizeof(*body), 0}, bufcount = 1; ENTRY; - mdc_con2cl(conn, &cl, &connection, &rconn); req = ptlrpc_prep_req2(conn, MDS_GETATTR, 1, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 70116ff..5f2e1d3 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -342,7 +342,7 @@ static int mds_getstatus(struct ptlrpc_request *req) /* mcd_last_xid is is stored in little endian on the disk and mds_pack_rep_body converts it to network order */ - body->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid); + req->rq_repmsg->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid); mds_pack_rep_body(req); RETURN(0); } @@ -951,7 +951,7 @@ int mds_handle(struct ptlrpc_request *req) if (!rc) { struct mds_obd *mds = mds_req2mds(req); - req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd); + req->rq_repmsg->last_xid = HTON__u64(mds->mds_last_rcvd); req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed); CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n", diff --git a/lustre/obdclass/Makefile.am b/lustre/obdclass/Makefile.am index b006a08..5bf7ba5 100644 --- a/lustre/obdclass/Makefile.am +++ b/lustre/obdclass/Makefile.am @@ -2,6 +2,9 @@ DEFS= MODULE = obdclass modulefs_DATA = obdclass.o EXTRA_PROGRAMS = obdclass -obdclass_SOURCES = genops.c proc_lustre.c class_obd.c sysctl.c +obdclass_SOURCES = genops.c proc_lustre.c class_obd.c sysctl.c page.c + +page.c: + test -e page.c || ln -sf $(top_srcdir)/lib/page.c include $(top_srcdir)/Rules diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 776490e..560ec6e 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -473,14 +473,15 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, rw = OBD_BRW_WRITE; case OBD_IOC_BRW_READ: { struct lov_stripe_md smd; + struct io_cb_data *cbd = ll_init_cb(); obd_count pages = 0; - struct page **bufs = NULL; - obd_size *counts = NULL; - obd_off *offsets = NULL; - obd_flag *flags = NULL; + struct brw_page *pga; int j; unsigned long off; void *from; + + if (!cbd) + GOTO(out, -ENOMEM); obd_data2conn(&conn, data); @@ -488,11 +489,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, CDEBUG(D_INODE, "BRW %s with %d pages\n", rw == OBD_BRW_READ ? "read" : "write", pages); - OBD_ALLOC(bufs, pages * sizeof(*bufs)); - OBD_ALLOC(counts, pages * sizeof(*counts)); - OBD_ALLOC(offsets, pages * sizeof(*offsets)); - OBD_ALLOC(flags, pages * sizeof(*flags)); - if (!bufs || !counts || !offsets || !flags) { + OBD_ALLOC(pga, pages * sizeof(*pga)); + if (!pga) { CERROR("no memory for %d BRW per-page data\n", pages); GOTO(brw_free, err = -ENOMEM); } @@ -517,25 +515,20 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, CERROR("no memory for brw pages\n"); GOTO(brw_cleanup, err = -ENOMEM); } - bufs[j] = virt_to_page(to); - counts[j] = PAGE_SIZE; - offsets[j] = off; - flags[j] = 0; + pga[j].pg = virt_to_page(to); + pga[j].count = PAGE_SIZE; + pga[j].off = off; + pga[j].flag = 0; } - err = obd_brw(rw, &conn, &smd, j, bufs, counts, offsets, flags, - NULL, NULL); - + err = obd_brw(rw, &conn, &smd, j, pga, ll_sync_io_cb, cbd); EXIT; brw_cleanup: for (j = 0; j < pages; j++) - if (bufs[j] != NULL) - __free_pages(bufs[j], 0); + if (pga[j].pg != NULL) + __free_pages(pga[j].pg, 0); brw_free: - OBD_FREE(bufs, pages * sizeof(*bufs)); - OBD_FREE(counts, pages * sizeof(*counts)); - OBD_FREE(offsets, pages * sizeof(*offsets)); - OBD_FREE(flags, pages * sizeof(*flags)); + OBD_FREE(pga, pages * sizeof(*pga)); GOTO(out, err); } default: diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index d9e847d..f5e13b2 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -725,8 +725,7 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa, static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count oa_bufs, - struct page **pages, obd_size *count, - obd_off *offset, obd_flag *flags, + struct brw_page *pga, brw_callback_t callback, void *data) { struct obd_run_ctxt saved; @@ -757,32 +756,33 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, CDEBUG(D_INODE, "OP %d obdo pgno: (%d) (%ld,%ld) " "off count (%Ld,%Ld)\n", cmd, pnum, file->f_dentry->d_inode->i_ino, - (unsigned long)offset[pnum] >> PAGE_CACHE_SHIFT, - (unsigned long long)offset[pnum], - (unsigned long long)count[pnum]); + (unsigned long)pga[pnum].off >> PAGE_CACHE_SHIFT, + (unsigned long long)pga[pnum].off, + (unsigned long long)pga[pnum].count); if (cmd & OBD_BRW_WRITE) { loff_t off; char *buffer; - off = offset[pnum]; - buffer = kmap(pages[pnum]); - retval = file->f_op->write(file, buffer, count[pnum], + off = pga[pnum].off; + buffer = kmap(pga[pnum].pg); + retval = file->f_op->write(file, buffer, + pga[pnum].count, &off); kunmap(pages[pnum]); CDEBUG(D_INODE, "retval %ld\n", retval); } else { - loff_t off = offset[pnum]; - char *buffer = kmap(pages[pnum]); + loff_t off = pga[pnum].off; + char *buffer = kmap(pga[pnum].pg); if (off >= file->f_dentry->d_inode->i_size) { - memset(buffer, 0, count[pnum]); - retval = count[pnum]; + memset(buffer, 0, pga[pnum].count); + retval = pga[pnum].count; } else { retval = file->f_op->read(file, buffer, - count[pnum], &off); + pga[pnum].count, &off); } kunmap(pages[pnum]); - if (retval != count[pnum]) { + if (retval != pga[pnum].count) { filp_close(file, 0); GOTO(out, retval = -EIO); } @@ -1369,23 +1369,40 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, * and arrays to handle the request parameters. */ while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) { - obd_size brw_count = PAGE_SIZE; - obd_off brw_offset = (page->index) << PAGE_SHIFT; - obd_flag flagr = 0; - obd_flag flagw = OBD_BRW_CREATE; + struct brw_page pg; + struct io_cb_data *cbd = ll_init_cb(); + + if (!cbd) { + err = -ENOMEM; + EXIT; + break; + } + + pg.pg = page; + pg.count = PAGE_SIZE; + pg.off = (page->index) << PAGE_SHIFT; + pg.flag = 0; page->index = index; - err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &page, - &brw_count, &brw_offset, &flagr, NULL, NULL); + err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, + ll_sync_io_cb, cbd); if ( err ) { EXIT; break; } + + cbd = ll_init_cb(); + if (!cbd) { + err = -ENOMEM; + EXIT; + break; + } + pg.flag = OBD_BRW_CREATE; CDEBUG(D_INFO, "Read page %ld ...\n", page->index); - err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &page, - &brw_count, &brw_offset, &flagw, NULL, NULL); + err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, + ll_sync_io_cb, cbd); /* XXX should handle dst->o_size, dst->o_blocks here */ if ( err ) { diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 4ebefc5..3f960a4 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -316,13 +316,16 @@ static void unmap_and_decref_bulk_desc(void *data) static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data) { struct osc_brw_cb_data *cb_data = data; + int err = 0; ENTRY; - if (desc->b_flags & PTL_RPC_FL_INTR) + if (desc->b_flags & PTL_RPC_FL_INTR) { + err = -ERESTARTSYS; CERROR("got signal\n"); + } if (cb_data->callback) - cb_data->callback(cb_data->cb_data); + cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH); OBD_FREE(cb_data->obd_data, cb_data->obd_size); OBD_FREE(cb_data, sizeof(*cb_data)); @@ -336,10 +339,7 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data) EXIT; } -static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, - obd_count page_count, struct page **page_array, - obd_size *count, obd_off *offset, obd_flag *flags, - brw_callback_t callback, void *data) +static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, brw_callback_t callback, void *data) { struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn; struct ptlrpc_request *request = NULL; @@ -390,11 +390,11 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, bulk->b_xid = xid; /* single xid for all pages */ - bulk->b_buf = kmap(page_array[mapped]); - bulk->b_page = page_array[mapped]; + bulk->b_buf = kmap(pga[mapped].pg); + bulk->b_page = pga[mapped].pg; bulk->b_buflen = PAGE_SIZE; - ost_pack_niobuf(&nioptr, offset[mapped], count[mapped], - flags[mapped], bulk->b_xid); + ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count, + pga[mapped].flag, bulk->b_xid); } /* @@ -409,11 +409,9 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, * * On error, we never do the brw_finish, so we handle all decrefs. */ - if (!callback) - ptlrpc_bulk_addref(desc); rc = ptlrpc_register_bulk(desc); if (rc) - GOTO(out_desc2, rc); + GOTO(out_unmap, rc); request->rq_replen = lustre_msg_size(1, size); rc = ptlrpc_queue_wait(request); @@ -432,16 +430,10 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, * restart them" and osc_brw callers can know this. */ if (rc) - GOTO(out_desc2, rc); + GOTO(out_unmap, rc); /* Callbacks cause asynchronous handling. */ - if (callback) - GOTO(out_req, rc = 0); - - /* If there's no callback function, sleep here until complete. */ - l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc)); - if (desc->b_flags & PTL_RPC_FL_INTR) - GOTO(out_desc, rc = -EINTR); + rc = callback(data, 0, CB_PHASE_START); EXIT; out_desc: @@ -451,9 +443,6 @@ out_req: RETURN(rc); /* Clean up on error. */ -out_desc2: - if (!callback) - ptlrpc_bulk_decref(desc); out_unmap: while (mapped-- > 0) kunmap(page_array[mapped]); @@ -463,8 +452,7 @@ out_unmap: static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, - struct page **pagearray, obd_size *count, - obd_off *offset, obd_flag *flags, + struct brw_page *pga, brw_callback_t callback, void *data) { struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn; @@ -514,11 +502,11 @@ static int osc_brw_write(struct lustre_handle *conn, cb_data->obd_size = page_count * sizeof(*local); for (mapped = 0; mapped < page_count; mapped++) { - local[mapped].addr = kmap(pagearray[mapped]); - local[mapped].offset = offset[mapped]; - local[mapped].len = count[mapped]; - ost_pack_niobuf(&nioptr, offset[mapped], count[mapped], - flags[mapped], 0); + local[mapped].addr = kmap(pga[mapped].pg); + local[mapped].offset = pga[mapped].off; + local[mapped].len = pga[mapped].count; + ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count, + pga[mapped].flag, 0); } size[1] = page_count * sizeof(*remote); @@ -550,7 +538,7 @@ static int osc_brw_write(struct lustre_handle *conn, bulk->b_buf = (void *)(unsigned long)local[j].addr; bulk->b_buflen = local[j].len; bulk->b_xid = remote->xid; - bulk->b_page = pagearray[j]; + bulk->b_page = pga[j].pg; } if (desc->b_page_count != page_count) @@ -559,12 +547,7 @@ static int osc_brw_write(struct lustre_handle *conn, /* * One reference is released when brw_finish is complete, the * other here when we finish waiting on it if we don't have a callback. - * - * We don't reference the bulk descriptor again here if there is a - * callback, so we don't need an additional refcount on it. */ - if (!callback) - ptlrpc_bulk_addref(desc); rc = ptlrpc_send_bulk(desc); /* XXX: Mike, same question as in osc_brw_read. */ @@ -572,13 +555,7 @@ static int osc_brw_write(struct lustre_handle *conn, GOTO(out_desc2, rc); /* Callbacks cause asynchronous handling. */ - if (callback) - GOTO(out_req, rc = 0); - - /* If there's no callback function, sleep here until complete. */ - l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); - if (desc->b_flags & PTL_RPC_FL_INTR) - GOTO(out_desc, rc = -EINTR); + rc = callback(data, 0, CB_PHASE_START); EXIT; out_desc: @@ -603,23 +580,22 @@ out_cb: static int osc_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, - struct page **page_array, obd_size *count, obd_off *offset, - obd_flag *flags, brw_callback_t callback, void *data) + struct brw_page *pagear, brw_callback_t callback, + void *data) { if (cmd & OBD_BRW_WRITE) - return osc_brw_write(conn, md, page_count, page_array, count, - offset, flags, callback, data); + return osc_brw_write(conn, md, page_count, pagear, callback, data); else - return osc_brw_read(conn, md, page_count, page_array, count, - offset, flags, callback, data); + return osc_brw_read(conn, md, page_count, pagear, callback, data); } -static int osc_enqueue(struct lustre_handle *connh, - struct lustre_handle *parent_lock, __u64 *res_id, +static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, + struct lustre_handle *parent_lock, __u32 type, void *extentp, int extent_len, __u32 mode, int *flags, void *callback, void *data, int datalen, struct lustre_handle *lockh) { + __u64 res_id = { md->lmd_object_id }; struct obd_device *obddev = class_conn2obd(connh); struct ldlm_extent *extent = extentp; int rc; @@ -632,7 +608,7 @@ static int osc_enqueue(struct lustre_handle *connh, /* Next, search for already existing extent locks that will cover us */ //osc_con2dlmcl(conn, &cl, &connection, &rconn); - rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent, + rc = ldlm_lock_match(obddev->obd_namespace, &res_id, type, extent, sizeof(extent), mode, lockh); if (rc == 1) { /* We already have a lock, and it's referenced */ @@ -648,7 +624,7 @@ static int osc_enqueue(struct lustre_handle *connh, else mode2 = LCK_PW; - rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent, + rc = ldlm_lock_match(obddev->obd_namespace, &res_id, type, extent, sizeof(extent), mode2, lockh); if (rc == 1) { int flags; @@ -669,12 +645,12 @@ static int osc_enqueue(struct lustre_handle *connh, } rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace, - parent_lock, res_id, type, extent, sizeof(extent), + parent_lock, &res_id, type, extent, sizeof(extent), mode, flags, ldlm_completion_ast, callback, data, datalen, lockh); return rc; } -static int osc_cancel(struct lustre_handle *oconn, __u32 mode, +static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md, __u32 mode, struct lustre_handle *lockh) { ENTRY; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 60e8e0a..d830e82a 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -554,7 +554,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) req->rq_replen, req->rq_repmsg->status); spin_lock(&cli->cli_lock); - cli->cli_last_rcvd = req->rq_repmsg->last_rcvd; + cli->cli_last_xid = req->rq_repmsg->last_xid; cli->cli_last_committed = req->rq_repmsg->last_committed; ptlrpc_free_committed(cli); spin_unlock(&cli->cli_lock); diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 86feff0..38399a1 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -85,7 +85,7 @@ int lustre_unpack_msg(struct lustre_msg *m, int len) m->status = NTOH__u32(m->status); m->type = NTOH__u32(m->type); m->bufcount = NTOH__u32(m->bufcount); - m->last_rcvd = NTOH__u64(m->last_rcvd); + m->last_xid = NTOH__u64(m->last_xid); m->last_committed = NTOH__u64(m->last_committed); required_len = size_round(sizeof(*m) + m->bufcount * sizeof(__u32)); diff --git a/lustre/utils/lctl.c b/lustre/utils/lctl.c index 03a32b6..9bf5eaa 100644 --- a/lustre/utils/lctl.c +++ b/lustre/utils/lctl.c @@ -108,8 +108,8 @@ command_t cmdlist[] = { {"detach", jt_obd_detach, 0, "un-name a device\n" "usage: detach"}, {"lovconfig", jt_obd_lov_config, 0, - "write lov configuration to a mds device\n" - "usage: lovconfig lov-uuid stripcount stripsize pattern UUID1 [UUID2 ...]"}, + "write lov configuration to an mds device\n" + "usage: lovconfig lov-uuid stripe-count stripe-size offset pattern UUID1 [UUID2 ...]"}, /* Device operations */ {"=== device operations ==", jt_noop, 0, "device operations"}, -- 1.8.3.1