Whamcloud - gitweb
- move to single ptlrpc code paths on the ptlbd client and server which know
authorzab <zab>
Thu, 6 Feb 2003 00:26:29 +0000 (00:26 +0000)
committerzab <zab>
Thu, 6 Feb 2003 00:26:29 +0000 (00:26 +0000)
  to switch between _put and _get for read and write.  in the process, clean
  up the ridiculous callback/refcounting mess that was the first attempt.
- also, pass through to blk_ioctl() when people ask us to flush.

lustre/ptlbd/blk.c
lustre/ptlbd/rpc.c

index 4a79343..fb5466f 100644 (file)
@@ -49,6 +49,7 @@
 #define LOCAL_END_REQUEST
 #include <linux/blk.h>
 #include <linux/blkdev.h>
+#include <linux/blkpg.h>
 #include <linux/devfs_fs_kernel.h>
 
 static int ptlbd_size_size[PTLBD_MAX_MINOR];
@@ -106,6 +107,7 @@ static int ptlbd_ioctl(struct inode *inode, struct file *file,
                 unsigned int cmd, unsigned long arg)
 {
         struct ptlbd_obd *ptlbd;
+        int ret;
 
         if ( ! capable(CAP_SYS_ADMIN) )
                 RETURN(-EPERM);
@@ -114,9 +116,16 @@ static int ptlbd_ioctl(struct inode *inode, struct file *file,
         if ( IS_ERR(ptlbd) )
                 RETURN( PTR_ERR(ptlbd) );
 
-        /* XXX getattr{,64} */
+        switch(cmd) {
+                case BLKFLSBUF:
+                        ret = blk_ioctl(inode->i_rdev, cmd, arg);
+                        break;
+                default:
+                        ret = -EINVAL;
+                        break;
+        }
 
-        RETURN(-EINVAL);
+        RETURN(ret);
 }
 
 static int ptlbd_release(struct inode *inode, struct file *file)
index 76c857b..7899436 100644 (file)
 #include <linux/lprocfs_status.h>
 #include <linux/obd_ptlbd.h>
 
-static __u32 get_next_xid(struct obd_import *imp)
-{
-        unsigned long flags;
-        __u32 xid;
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        xid = ++imp->imp_last_xid;
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
-        return xid;
-}
-
-static int ptlbd_brw_callback(struct obd_brw_set *set, int phase)
-{
-        ENTRY;
-        RETURN(0);
-}
-
-static void decref_bulk_desc(void *data)
-{
-        struct ptlrpc_bulk_desc *desc = data;
-        ENTRY;
-
-        ptlrpc_bulk_decref(desc);
-        EXIT;
-}
-
-/*  this is the callback function which is invoked by the Portals
- *  event handler associated with the bulk_sink queue and bulk_source queue. 
- */
-static void ptlbd_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
-{
-        ENTRY;
-
-        LASSERT(desc->bd_brw_set != NULL);
-        LASSERT(desc->bd_brw_set->brw_callback != NULL);
-
-        desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
-
-        prepare_work(&desc->bd_queue, decref_bulk_desc, desc);
-        schedule_work(&desc->bd_queue);
-
-        EXIT;
-}
-
-
-int ptlbd_write_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
-                struct buffer_head *first_bh, unsigned int page_count)
-{
-        struct obd_import *imp = &ptlbd->bd_import;
-        struct ptlbd_op *op;
-        struct ptlbd_niob *niob, *niobs;
-        struct ptlbd_rsp *rsp;
-        struct ptlrpc_request *req;
-        struct ptlrpc_bulk_desc *desc;
-        struct buffer_head *bh;
-        int rc, size[2];
-        struct obd_brw_set *set;
-        ENTRY;
-
-        size[0] = sizeof(struct ptlbd_op);
-        size[1] = page_count * sizeof(struct ptlbd_niob);
-
-        req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-        /* XXX might not need these */
-        req->rq_request_portal = PTLBD_REQUEST_PORTAL;
-        req->rq_reply_portal = PTLBD_REPLY_PORTAL;
-
-        op = lustre_msg_buf(req->rq_reqmsg, 0);
-        niobs = lustre_msg_buf(req->rq_reqmsg, 1);
-
-        /* XXX pack */
-        op->op_cmd = cmd;
-        op->op_lun = 0;
-        op->op_niob_cnt = page_count;
-        op->op__padding = 0;
-        op->op_block_cnt = page_count;
-
-        desc = ptlrpc_prep_bulk(imp->imp_connection);
-        if ( desc == NULL )
-                GOTO(out_req, rc = -ENOMEM);
-        desc->bd_portal = PTLBD_BULK_PORTAL;
-        desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
-
-        /* XXX someone needs to free this */
-        set = obd_brw_set_new();
-        if (set == NULL)
-                GOTO(out_desc, rc = -ENOMEM);
-
-        set->brw_callback = ptlbd_brw_callback;
-#if 0
-        xid = get_next_xid(imp);
-#endif
-
-        for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
-#if 0
-                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
-                if (bulk == NULL)
-                        GOTO(out_set, rc = -ENOMEM);
-#endif
-
-#if 0
-                niob->n_xid = xid;
-#endif
-                niob->n_block_nr = bh->b_blocknr;
-                niob->n_offset = bh_offset(bh);
-                niob->n_length = bh->b_size;
-
-
-#if 0
-                bulk->bp_xid = xid;
-                bulk->bp_buf = bh->b_data;
-                bulk->bp_page = bh->b_page;
-                bulk->bp_buflen = bh->b_size;
-#endif
-        }
-
-
-        size[0] = sizeof(struct ptlbd_rsp);
-        size[1] = sizeof(struct ptlbd_niob) * page_count;
-        req->rq_replen = lustre_msg_size(2, size);
-
-        /* XXX find out how we're really supposed to manage levels */
-        req->rq_level = imp->imp_level;
-        rc = ptlrpc_queue_wait(req);
-
-        rsp = lustre_msg_buf(req->rq_repmsg, 0);
-
-        niob = lustre_msg_buf(req->rq_repmsg, 1);
-        /* XXX check that op->num matches ours */
-        for ( bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
-                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
-                if (bulk == NULL)
-                        GOTO(out_set, rc = -ENOMEM);
-
-                bulk->bp_xid = niob->n_xid;
-                bulk->bp_page = bh->b_page;
-                bulk->bp_buf = bh->b_data;
-                bulk->bp_buflen = bh->b_size;
-        }
-
-        obd_brw_set_add(set, desc);
-        rc = ptlrpc_bulk_put(desc);
-
-        /* if there's an error, no brw_finish called, just like
-         * osc_brw_read */
-
-        GOTO(out_req, rc);
-
-out_set:
-        obd_brw_set_free(set);
-out_desc:
-        ptlrpc_bulk_decref(desc);
-out_req:
-        ptlrpc_req_finished(req);
-out:
-        RETURN(rc);
-}
-
-int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
-                struct buffer_head *first_bh, unsigned int page_count)
+int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
+                struct buffer_head *first_bh)
 {
         struct obd_import *imp = &ptlbd->bd_import;
         struct ptlbd_op *op;
@@ -201,20 +41,23 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
         struct ptlrpc_request *req;
         struct ptlrpc_bulk_desc *desc;
         struct buffer_head *bh;
+        unsigned long flags;
+        unsigned int page_count;
         int rc, rep_size, size[2];
-        struct obd_brw_set *set;
         __u32 xid;
         ENTRY;
 
+        LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
+
+        for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
+                page_count++;
+
         size[0] = sizeof(struct ptlbd_op);
         size[1] = page_count * sizeof(struct ptlbd_niob);
 
         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
         if (!req)
-                GOTO(out, rc = -ENOMEM);
-        /* XXX might not need these? */
-        req->rq_request_portal = PTLBD_REQUEST_PORTAL;
-        req->rq_reply_portal = PTLBD_REPLY_PORTAL;
+                RETURN(-ENOMEM);
 
         op = lustre_msg_buf(req->rq_reqmsg, 0);
         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
@@ -230,21 +73,16 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
         if ( desc == NULL )
                 GOTO(out_req, rc = -ENOMEM);
         desc->bd_portal = PTLBD_BULK_PORTAL;
-        desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
-
-        /* XXX someone needs to free this */
-        set = obd_brw_set_new();
-        if (set == NULL)
-                GOTO(out_desc, rc = -ENOMEM);
-
-        set->brw_callback = ptlbd_brw_callback;
+        desc->bd_ptl_ev_hdlr = NULL;
 
-        xid = get_next_xid(imp);
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        xid = ++imp->imp_last_xid;
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
-                        GOTO(out_set, rc = -ENOMEM);
+                        GOTO(out_req, rc = -ENOMEM);
 
                 niob->n_xid = xid;
                 niob->n_block_nr = bh->b_blocknr;
@@ -257,12 +95,13 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
                 bulk->bp_buflen = bh->b_size;
         }
 
-        /* XXX put in OBD_FAIL_CHECK for ptlbd? */
-        rc = ptlrpc_register_bulk_put(desc);
-        if (rc)
-                GOTO(out_set, rc);
+        if ( cmd == PTLBD_READ )
+                rc = ptlrpc_register_bulk_put(desc);
+        else
+                rc = ptlrpc_register_bulk_get(desc);
 
-        obd_brw_set_add(set, desc);
+        if (rc)
+                GOTO(out_desc, rc);
 
         rep_size = sizeof(struct ptlbd_rsp);
         req->rq_replen = lustre_msg_size(1, &rep_size);
@@ -271,48 +110,15 @@ int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
         req->rq_level = imp->imp_level;
         rc = ptlrpc_queue_wait(req);
 
-        rsp = lustre_msg_buf(req->rq_repmsg, 0);
-
-        /* if there's an error, no brw_finish called, just like
-         * osc_brw_read */
-
-        GOTO(out_req, rc);
+        if ( rc == 0 ) {
+                rsp = lustre_msg_buf(req->rq_repmsg, 0);
+                /* XXX do stuff */
+        }
 
-out_set:
-        obd_brw_set_free(set);
 out_desc:
         ptlrpc_bulk_decref(desc);
 out_req:
         ptlrpc_req_finished(req);
-out:
-        RETURN(rc);
-}
-
-int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
-                struct buffer_head *first_bh)
-{
-        unsigned int page_count = 0;
-        struct buffer_head *bh;
-        int rc;
-        ENTRY;
-
-        for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
-                page_count++;
-
-        switch (cmd) {
-                case PTLBD_READ:
-                        rc = ptlbd_read_put_req(ptlbd, cmd, 
-                                        first_bh, page_count);
-                        break;
-                case PTLBD_WRITE:
-                        rc = ptlbd_write_put_req(ptlbd, cmd, 
-                                        first_bh, page_count);
-                        break;
-                default:
-                        rc = -EINVAL;
-                        break;
-        };
-
         RETURN(rc);
 }
 
@@ -341,93 +147,30 @@ static struct page * fake_page(int block_nr)
         return pages[block_nr];
 }
 
-static int ptlbd_put_write(struct ptlrpc_request *req)
+int ptlbd_parse_req(struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ptlbd_op *op;
-        struct ptlbd_niob *reply_niob, *request_niob;
+        struct ptlbd_niob *niob, *niobs;
         struct ptlbd_rsp *rsp;
         struct ptlrpc_bulk_desc *desc;
-        struct ptlrpc_service *srv;
         struct l_wait_info lwi;
-        int size[2];
-        int i, page_count, rc;
-        __u32 xid;
-
-        op = lustre_msg_buf(req->rq_reqmsg, 0);
-        request_niob = lustre_msg_buf(req->rq_reqmsg, 1);
-        page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
-
-        size[0] = sizeof(struct ptlbd_rsp);
-        size[1] = sizeof(struct ptlbd_niob) * page_count;
-        rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                GOTO(out, rc);
-        reply_niob = lustre_msg_buf(req->rq_repmsg, 1);
-
-        desc = ptlrpc_prep_bulk(req->rq_connection);
-        if (desc == NULL)
-                GOTO(out, rc = -ENOMEM);
-        desc->bd_ptl_ev_hdlr = NULL;
-        desc->bd_portal = PTLBD_BULK_PORTAL;
-        memcpy(&(desc->bd_conn), &conn, sizeof(conn)); /* XXX what? */
-
-        srv = req->rq_obd->u.ptlbd.ptlbd_service;
-        spin_lock(&srv->srv_lock);
-        xid = srv->srv_xid++;                   /* single xid for all pages */
-        spin_unlock(&srv->srv_lock);
-
-        for ( i = 0; i < page_count; i++) {
-                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
-                if (bulk == NULL)
-                        GOTO(out_desc, rc = -ENOMEM);
-                        
-                reply_niob[i] = request_niob[i];
-                reply_niob[i].n_xid = xid;
-
-                bulk->bp_xid = xid;
-                bulk->bp_page = fake_page(request_niob[i].n_block_nr);
-                bulk->bp_buf = page_address(bulk->bp_page);
-                bulk->bp_buflen = request_niob[i].n_length;
-        }
+        int size[1], wait_flag, i, page_count, rc;
+        ENTRY;
 
-        rc = ptlrpc_register_bulk_put(desc);
+        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
         if ( rc )
-                GOTO(out_desc, rc);
-
-        rsp = lustre_msg_buf(req->rq_reqmsg, 0);
-        rsp->r_status = 42;
-        rsp->r_error_cnt = 13;
-        ptlrpc_reply(req->rq_svc, req);
-
-        /* this synchronization probably isn't good enough */
-        lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
-        rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_RCVD, 
-                        &lwi);
-
-out_desc:
-        ptlrpc_free_bulk(desc);
-out:
-        RETURN(rc);
-}
-
-static int ptlbd_put_read(struct ptlrpc_request *req)
-{
-        struct ptlbd_op *op;
-        struct ptlbd_niob *niob, *niobs;
-        struct ptlbd_rsp *rsp;
-        struct ptlrpc_bulk_desc *desc;
-        struct l_wait_info lwi;
-        int size[1];
-        int i, page_count, rc;
+                RETURN(rc);
 
         op = lustre_msg_buf(req->rq_reqmsg, 0);
+        LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
+
         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
 
         desc = ptlrpc_prep_bulk(req->rq_connection);
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
+        desc->bd_ptl_ev_hdlr = NULL;
         desc->bd_portal = PTLBD_BULK_PORTAL;
 
         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
@@ -444,14 +187,20 @@ static int ptlbd_put_read(struct ptlrpc_request *req)
                 bulk->bp_buflen = niob->n_length;
         }
 
-        rc = ptlrpc_bulk_put(desc);
+        if ( op->op_cmd == PTLBD_READ ) {
+                rc = ptlrpc_bulk_put(desc);
+                wait_flag = PTL_BULK_FL_SENT;
+        } else {
+                rc = ptlrpc_bulk_get(desc);
+                wait_flag = PTL_BULK_FL_RCVD;
+        }
+
         if ( rc )
                 GOTO(out_bulk, rc);
 
         /* this synchronization probably isn't good enough */
         lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
-        rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, 
-                        &lwi);
+        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
 
         size[0] = sizeof(struct ptlbd_rsp);
         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
@@ -469,82 +218,7 @@ static int ptlbd_put_read(struct ptlrpc_request *req)
         ptlrpc_reply(req->rq_svc, req);
 
 out_bulk:
-        ptlrpc_free_bulk(desc);
+        ptlrpc_bulk_decref(desc);
 out:
         RETURN(rc);
 }
-
-
-int ptlbd_parse_req(struct ptlrpc_request *req)
-{
-        struct ptlbd_op *op;
-        int rc;
-        ENTRY;
-
-        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
-        if ( rc )
-                RETURN(rc);
-
-        op = lustre_msg_buf(req->rq_reqmsg, 0);
-
-        switch(op->op_cmd) {
-                case PTLBD_READ:
-                        ptlbd_put_read(req);
-                        break;
-                case PTLBD_WRITE:
-                        ptlbd_put_write(req);
-                        break;
-                default:
-                        CERROR("fix this %d\n", op->op_cmd);
-                        break;
-        }
-
-        RETURN(0);
-}
-
-
-#if 0
-int ptlbd_bh_req(int cmd, struct ptlbd_state *st, struct buffer_head *first_bh)
-{
-        struct obd_brw_set *set = NULL;
-        struct brw_page *pg = NULL;
-        struct buffer_head *bh;
-        int rc, i, pg_bytes = 0;
-        ENTRY;
-
-        for ( bh = first_bh ; bh ; bh = bh->b_reqnext ) 
-                pg_bytes += sizeof(struct brw_page);
-
-        OBD_ALLOC(pg, pg_bytes);
-        if ( pg == NULL )
-                GOTO(out, rc = -ENOMEM);
-
-        set = obd_brw_set_new();
-        if (set == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        for ( i = 0, bh = first_bh ; bh ; bh = bh->b_reqnext, i++) {
-                pg[i].pg = bh->b_page;
-                pg[i].off = bh_offset(bh);
-                pg[i].count = bh->b_size;
-                pg[i].flag = 0;
-        }
-
-        set->brw_callback = ll_brw_sync_wait;
-        rc = obd_brw(cmd, /* lsm */NULL, num_pages, pg, set);
-        if ( rc )
-                GOTO(out, rc);
-
-        rc = ll_brw_sync_wait(set, CB_PHASE_START);
-        if (rc)
-                CERROR("error from callback: rc = %d\n", rc);
-
-out:
-        if ( pg != NULL )
-                OBD_FREE(pg, pg_bytes);
-        if ( set != NULL )
-                obd_brw_set_free(set);
-
-        RETURN(rc); 
-}
-#endif