From: zab Date: Thu, 6 Feb 2003 00:26:29 +0000 (+0000) Subject: - move to single ptlrpc code paths on the ptlbd client and server which know X-Git-Tag: v1_7_110~2^11~200 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=7fd0880e89916534ec776a937c0c54f40ca29ba9;p=fs%2Flustre-release.git - move to single ptlrpc code paths on the ptlbd client and server which know to switch between _put and _get for read and write. in the process, clean up the ridiculous callback/refcounting mess that was the first attempt. - also, pass through to blk_ioctl() when people ask us to flush. --- diff --git a/lustre/ptlbd/blk.c b/lustre/ptlbd/blk.c index 4a793436..fb5466f 100644 --- a/lustre/ptlbd/blk.c +++ b/lustre/ptlbd/blk.c @@ -49,6 +49,7 @@ #define LOCAL_END_REQUEST #include #include +#include #include static int ptlbd_size_size[PTLBD_MAX_MINOR]; @@ -106,6 +107,7 @@ static int ptlbd_ioctl(struct inode *inode, struct file *file, unsigned int cmd, unsigned long arg) { struct ptlbd_obd *ptlbd; + int ret; if ( ! capable(CAP_SYS_ADMIN) ) RETURN(-EPERM); @@ -114,9 +116,16 @@ static int ptlbd_ioctl(struct inode *inode, struct file *file, if ( IS_ERR(ptlbd) ) RETURN( PTR_ERR(ptlbd) ); - /* XXX getattr{,64} */ + switch(cmd) { + case BLKFLSBUF: + ret = blk_ioctl(inode->i_rdev, cmd, arg); + break; + default: + ret = -EINVAL; + break; + } - RETURN(-EINVAL); + RETURN(ret); } static int ptlbd_release(struct inode *inode, struct file *file) diff --git a/lustre/ptlbd/rpc.c b/lustre/ptlbd/rpc.c index 76c857b..7899436 100644 --- a/lustre/ptlbd/rpc.c +++ b/lustre/ptlbd/rpc.c @@ -31,168 +31,8 @@ #include #include -static __u32 get_next_xid(struct obd_import *imp) -{ - unsigned long flags; - __u32 xid; - spin_lock_irqsave(&imp->imp_lock, flags); - xid = ++imp->imp_last_xid; - spin_unlock_irqrestore(&imp->imp_lock, flags); - return xid; -} - -static int ptlbd_brw_callback(struct obd_brw_set *set, int phase) -{ - ENTRY; - RETURN(0); -} - -static void decref_bulk_desc(void *data) -{ - struct ptlrpc_bulk_desc *desc = data; - ENTRY; - - ptlrpc_bulk_decref(desc); - EXIT; -} - -/* this is the callback function which is invoked by the Portals - * event handler associated with the bulk_sink queue and bulk_source queue. - */ -static void ptlbd_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc) -{ - ENTRY; - - LASSERT(desc->bd_brw_set != NULL); - LASSERT(desc->bd_brw_set->brw_callback != NULL); - - desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH); - - prepare_work(&desc->bd_queue, decref_bulk_desc, desc); - schedule_work(&desc->bd_queue); - - EXIT; -} - - -int ptlbd_write_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, - struct buffer_head *first_bh, unsigned int page_count) -{ - struct obd_import *imp = &ptlbd->bd_import; - struct ptlbd_op *op; - struct ptlbd_niob *niob, *niobs; - struct ptlbd_rsp *rsp; - struct ptlrpc_request *req; - struct ptlrpc_bulk_desc *desc; - struct buffer_head *bh; - int rc, size[2]; - struct obd_brw_set *set; - ENTRY; - - size[0] = sizeof(struct ptlbd_op); - size[1] = page_count * sizeof(struct ptlbd_niob); - - req = ptlrpc_prep_req(imp, cmd, 2, size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); - /* XXX might not need these */ - req->rq_request_portal = PTLBD_REQUEST_PORTAL; - req->rq_reply_portal = PTLBD_REPLY_PORTAL; - - op = lustre_msg_buf(req->rq_reqmsg, 0); - niobs = lustre_msg_buf(req->rq_reqmsg, 1); - - /* XXX pack */ - op->op_cmd = cmd; - op->op_lun = 0; - op->op_niob_cnt = page_count; - op->op__padding = 0; - op->op_block_cnt = page_count; - - desc = ptlrpc_prep_bulk(imp->imp_connection); - if ( desc == NULL ) - GOTO(out_req, rc = -ENOMEM); - desc->bd_portal = PTLBD_BULK_PORTAL; - desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr; - - /* XXX someone needs to free this */ - set = obd_brw_set_new(); - if (set == NULL) - GOTO(out_desc, rc = -ENOMEM); - - set->brw_callback = ptlbd_brw_callback; - -#if 0 - xid = get_next_xid(imp); -#endif - - for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) { -#if 0 - struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); - if (bulk == NULL) - GOTO(out_set, rc = -ENOMEM); -#endif - -#if 0 - niob->n_xid = xid; -#endif - niob->n_block_nr = bh->b_blocknr; - niob->n_offset = bh_offset(bh); - niob->n_length = bh->b_size; - - -#if 0 - bulk->bp_xid = xid; - bulk->bp_buf = bh->b_data; - bulk->bp_page = bh->b_page; - bulk->bp_buflen = bh->b_size; -#endif - } - - - size[0] = sizeof(struct ptlbd_rsp); - size[1] = sizeof(struct ptlbd_niob) * page_count; - req->rq_replen = lustre_msg_size(2, size); - - /* XXX find out how we're really supposed to manage levels */ - req->rq_level = imp->imp_level; - rc = ptlrpc_queue_wait(req); - - rsp = lustre_msg_buf(req->rq_repmsg, 0); - - niob = lustre_msg_buf(req->rq_repmsg, 1); - /* XXX check that op->num matches ours */ - for ( bh = first_bh ; bh ; bh = bh->b_next, niob++ ) { - struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); - if (bulk == NULL) - GOTO(out_set, rc = -ENOMEM); - - bulk->bp_xid = niob->n_xid; - bulk->bp_page = bh->b_page; - bulk->bp_buf = bh->b_data; - bulk->bp_buflen = bh->b_size; - } - - obd_brw_set_add(set, desc); - rc = ptlrpc_bulk_put(desc); - - /* if there's an error, no brw_finish called, just like - * osc_brw_read */ - - GOTO(out_req, rc); - -out_set: - obd_brw_set_free(set); -out_desc: - ptlrpc_bulk_decref(desc); -out_req: - ptlrpc_req_finished(req); -out: - RETURN(rc); -} - -int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, - struct buffer_head *first_bh, unsigned int page_count) +int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, + struct buffer_head *first_bh) { struct obd_import *imp = &ptlbd->bd_import; struct ptlbd_op *op; @@ -201,20 +41,23 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; struct buffer_head *bh; + unsigned long flags; + unsigned int page_count; int rc, rep_size, size[2]; - struct obd_brw_set *set; __u32 xid; ENTRY; + LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE); + + for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next ) + page_count++; + size[0] = sizeof(struct ptlbd_op); size[1] = page_count * sizeof(struct ptlbd_niob); req = ptlrpc_prep_req(imp, cmd, 2, size, NULL); if (!req) - GOTO(out, rc = -ENOMEM); - /* XXX might not need these? */ - req->rq_request_portal = PTLBD_REQUEST_PORTAL; - req->rq_reply_portal = PTLBD_REPLY_PORTAL; + RETURN(-ENOMEM); op = lustre_msg_buf(req->rq_reqmsg, 0); niobs = lustre_msg_buf(req->rq_reqmsg, 1); @@ -230,21 +73,16 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, if ( desc == NULL ) GOTO(out_req, rc = -ENOMEM); desc->bd_portal = PTLBD_BULK_PORTAL; - desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr; - - /* XXX someone needs to free this */ - set = obd_brw_set_new(); - if (set == NULL) - GOTO(out_desc, rc = -ENOMEM); - - set->brw_callback = ptlbd_brw_callback; + desc->bd_ptl_ev_hdlr = NULL; - xid = get_next_xid(imp); + spin_lock_irqsave(&imp->imp_lock, flags); + xid = ++imp->imp_last_xid; + spin_unlock_irqrestore(&imp->imp_lock, flags); for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) { struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) - GOTO(out_set, rc = -ENOMEM); + GOTO(out_req, rc = -ENOMEM); niob->n_xid = xid; niob->n_block_nr = bh->b_blocknr; @@ -257,12 +95,13 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, bulk->bp_buflen = bh->b_size; } - /* XXX put in OBD_FAIL_CHECK for ptlbd? */ - rc = ptlrpc_register_bulk_put(desc); - if (rc) - GOTO(out_set, rc); + if ( cmd == PTLBD_READ ) + rc = ptlrpc_register_bulk_put(desc); + else + rc = ptlrpc_register_bulk_get(desc); - obd_brw_set_add(set, desc); + if (rc) + GOTO(out_desc, rc); rep_size = sizeof(struct ptlbd_rsp); req->rq_replen = lustre_msg_size(1, &rep_size); @@ -271,48 +110,15 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, req->rq_level = imp->imp_level; rc = ptlrpc_queue_wait(req); - rsp = lustre_msg_buf(req->rq_repmsg, 0); - - /* if there's an error, no brw_finish called, just like - * osc_brw_read */ - - GOTO(out_req, rc); + if ( rc == 0 ) { + rsp = lustre_msg_buf(req->rq_repmsg, 0); + /* XXX do stuff */ + } -out_set: - obd_brw_set_free(set); out_desc: ptlrpc_bulk_decref(desc); out_req: ptlrpc_req_finished(req); -out: - RETURN(rc); -} - -int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, - struct buffer_head *first_bh) -{ - unsigned int page_count = 0; - struct buffer_head *bh; - int rc; - ENTRY; - - for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next ) - page_count++; - - switch (cmd) { - case PTLBD_READ: - rc = ptlbd_read_put_req(ptlbd, cmd, - first_bh, page_count); - break; - case PTLBD_WRITE: - rc = ptlbd_write_put_req(ptlbd, cmd, - first_bh, page_count); - break; - default: - rc = -EINVAL; - break; - }; - RETURN(rc); } @@ -341,93 +147,30 @@ static struct page * fake_page(int block_nr) return pages[block_nr]; } -static int ptlbd_put_write(struct ptlrpc_request *req) +int ptlbd_parse_req(struct ptlrpc_request *req) { - struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; struct ptlbd_op *op; - struct ptlbd_niob *reply_niob, *request_niob; + struct ptlbd_niob *niob, *niobs; struct ptlbd_rsp *rsp; struct ptlrpc_bulk_desc *desc; - struct ptlrpc_service *srv; struct l_wait_info lwi; - int size[2]; - int i, page_count, rc; - __u32 xid; - - op = lustre_msg_buf(req->rq_reqmsg, 0); - request_niob = lustre_msg_buf(req->rq_reqmsg, 1); - page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob); - - size[0] = sizeof(struct ptlbd_rsp); - size[1] = sizeof(struct ptlbd_niob) * page_count; - rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc) - GOTO(out, rc); - reply_niob = lustre_msg_buf(req->rq_repmsg, 1); - - desc = ptlrpc_prep_bulk(req->rq_connection); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); - desc->bd_ptl_ev_hdlr = NULL; - desc->bd_portal = PTLBD_BULK_PORTAL; - memcpy(&(desc->bd_conn), &conn, sizeof(conn)); /* XXX what? */ - - srv = req->rq_obd->u.ptlbd.ptlbd_service; - spin_lock(&srv->srv_lock); - xid = srv->srv_xid++; /* single xid for all pages */ - spin_unlock(&srv->srv_lock); - - for ( i = 0; i < page_count; i++) { - struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); - if (bulk == NULL) - GOTO(out_desc, rc = -ENOMEM); - - reply_niob[i] = request_niob[i]; - reply_niob[i].n_xid = xid; - - bulk->bp_xid = xid; - bulk->bp_page = fake_page(request_niob[i].n_block_nr); - bulk->bp_buf = page_address(bulk->bp_page); - bulk->bp_buflen = request_niob[i].n_length; - } + int size[1], wait_flag, i, page_count, rc; + ENTRY; - rc = ptlrpc_register_bulk_put(desc); + rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); if ( rc ) - GOTO(out_desc, rc); - - rsp = lustre_msg_buf(req->rq_reqmsg, 0); - rsp->r_status = 42; - rsp->r_error_cnt = 13; - ptlrpc_reply(req->rq_svc, req); - - /* this synchronization probably isn't good enough */ - lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc); - rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_RCVD, - &lwi); - -out_desc: - ptlrpc_free_bulk(desc); -out: - RETURN(rc); -} - -static int ptlbd_put_read(struct ptlrpc_request *req) -{ - struct ptlbd_op *op; - struct ptlbd_niob *niob, *niobs; - struct ptlbd_rsp *rsp; - struct ptlrpc_bulk_desc *desc; - struct l_wait_info lwi; - int size[1]; - int i, page_count, rc; + RETURN(rc); op = lustre_msg_buf(req->rq_reqmsg, 0); + LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE); + niobs = lustre_msg_buf(req->rq_reqmsg, 1); page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob); desc = ptlrpc_prep_bulk(req->rq_connection); if (desc == NULL) GOTO(out, rc = -ENOMEM); + desc->bd_ptl_ev_hdlr = NULL; desc->bd_portal = PTLBD_BULK_PORTAL; for ( i = 0, niob = niobs ; i < page_count; niob++, i++) { @@ -444,14 +187,20 @@ static int ptlbd_put_read(struct ptlrpc_request *req) bulk->bp_buflen = niob->n_length; } - rc = ptlrpc_bulk_put(desc); + if ( op->op_cmd == PTLBD_READ ) { + rc = ptlrpc_bulk_put(desc); + wait_flag = PTL_BULK_FL_SENT; + } else { + rc = ptlrpc_bulk_get(desc); + wait_flag = PTL_BULK_FL_RCVD; + } + if ( rc ) GOTO(out_bulk, rc); /* this synchronization probably isn't good enough */ lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc); - rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, - &lwi); + rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi); size[0] = sizeof(struct ptlbd_rsp); rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg); @@ -469,82 +218,7 @@ static int ptlbd_put_read(struct ptlrpc_request *req) ptlrpc_reply(req->rq_svc, req); out_bulk: - ptlrpc_free_bulk(desc); + ptlrpc_bulk_decref(desc); out: RETURN(rc); } - - -int ptlbd_parse_req(struct ptlrpc_request *req) -{ - struct ptlbd_op *op; - int rc; - ENTRY; - - rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); - if ( rc ) - RETURN(rc); - - op = lustre_msg_buf(req->rq_reqmsg, 0); - - switch(op->op_cmd) { - case PTLBD_READ: - ptlbd_put_read(req); - break; - case PTLBD_WRITE: - ptlbd_put_write(req); - break; - default: - CERROR("fix this %d\n", op->op_cmd); - break; - } - - RETURN(0); -} - - -#if 0 -int ptlbd_bh_req(int cmd, struct ptlbd_state *st, struct buffer_head *first_bh) -{ - struct obd_brw_set *set = NULL; - struct brw_page *pg = NULL; - struct buffer_head *bh; - int rc, i, pg_bytes = 0; - ENTRY; - - for ( bh = first_bh ; bh ; bh = bh->b_reqnext ) - pg_bytes += sizeof(struct brw_page); - - OBD_ALLOC(pg, pg_bytes); - if ( pg == NULL ) - GOTO(out, rc = -ENOMEM); - - set = obd_brw_set_new(); - if (set == NULL) - GOTO(out, rc = -ENOMEM); - - for ( i = 0, bh = first_bh ; bh ; bh = bh->b_reqnext, i++) { - pg[i].pg = bh->b_page; - pg[i].off = bh_offset(bh); - pg[i].count = bh->b_size; - pg[i].flag = 0; - } - - set->brw_callback = ll_brw_sync_wait; - rc = obd_brw(cmd, /* lsm */NULL, num_pages, pg, set); - if ( rc ) - GOTO(out, rc); - - rc = ll_brw_sync_wait(set, CB_PHASE_START); - if (rc) - CERROR("error from callback: rc = %d\n", rc); - -out: - if ( pg != NULL ) - OBD_FREE(pg, pg_bytes); - if ( set != NULL ) - obd_brw_set_free(set); - - RETURN(rc); -} -#endif