Whamcloud - gitweb
b=2936
[fs/lustre-release.git] / lustre / ptlbd / rpc.c
index d3e5083..5af3249 100644 (file)
 #include <linux/lprocfs_status.h>
 #include <linux/obd_ptlbd.h>
 
-#define RSP_OK       0
-#define RSP_NOTOK   -1
-#define RQ_OK        0
-
-int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
-                struct request *blkreq)
+int ptlbd_send_rw_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
+                   struct buffer_head *first_bh)
 {
-        struct buffer_head *first_bh = blkreq->bh;
-        struct obd_import *imp = &ptlbd->bd_import;
+        struct obd_import *imp = ptlbd->bd_import;
         struct ptlbd_op *op;
         struct ptlbd_niob *niob, *niobs;
         struct ptlbd_rsp *rsp;
@@ -49,12 +44,11 @@ int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
         struct buffer_head *bh;
         unsigned int page_count;
         int rc, rep_size, size[2];
-        __u32 xid;
         ENTRY;
 
         LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
 
-        for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
+        for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_reqnext )
                 page_count++;
 
         size[0] = sizeof(struct ptlbd_op);
@@ -62,10 +56,10 @@ int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
 
         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
         if (!req)
-                RETURN(-ENOMEM);
+                RETURN(rc = 1);                  /* need to return error cnt */
 
-        op = lustre_msg_buf(req->rq_reqmsg, 0);
-        niobs = lustre_msg_buf(req->rq_reqmsg, 1);
+        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+        niobs = lustre_msg_buf(req->rq_reqmsg, 1, size[1]);
 
         /* XXX pack */
         op->op_cmd = cmd;
@@ -74,77 +68,104 @@ int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
         op->op__padding = 0;
         op->op_block_cnt = page_count;
 
-        desc = ptlrpc_prep_bulk(imp->imp_connection);
+        if (cmd == PTLBD_READ) 
+                desc = ptlrpc_prep_bulk_imp (req, page_count,
+                                             BULK_PUT_SINK, PTLBD_BULK_PORTAL);
+        else
+                desc = ptlrpc_prep_bulk_imp (req, page_count,
+                                             BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
         if ( desc == NULL )
-                GOTO(out_req, rc = -ENOMEM);
-        desc->bd_portal = PTLBD_BULK_PORTAL;
-        desc->bd_ptl_ev_hdlr = NULL;
-
-        xid = ptlrpc_next_xid();
-
-        for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
-                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
-                if (bulk == NULL)
-                        GOTO(out_req, rc = -ENOMEM);
+                GOTO(out, rc = 1);              /* need to return error cnt */
+        /* NB req now owns desc, and frees it when she frees herself */
+        
+        for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_reqnext, niob++ ) {
+                ptlrpc_prep_bulk_page(desc, bh->b_page,
+                                      bh_offset (bh) & (PAGE_SIZE - 1),
+                                      bh->b_size);
 
-                niob->n_xid = xid;
                 niob->n_block_nr = bh->b_blocknr;
                 niob->n_offset = bh_offset(bh);
                 niob->n_length = bh->b_size;
-
-                bulk->bp_xid = xid;
-                bulk->bp_buf = bh->b_data;
-                bulk->bp_page = bh->b_page;
-                bulk->bp_buflen = bh->b_size;
         }
 
-        if ( cmd == PTLBD_READ )
-                rc = ptlrpc_register_bulk_put(desc);
-        else
-                rc = ptlrpc_register_bulk_get(desc);
-
-        if (rc)
-                GOTO(out_desc, rc);
-
         rep_size = sizeof(struct ptlbd_rsp);
         req->rq_replen = lustre_msg_size(1, &rep_size);
 
         /* XXX find out how we're really supposed to manage levels */
-        req->rq_level = imp->imp_level;
+        req->rq_send_state = imp->imp_state;
         rc = ptlrpc_queue_wait(req);
 
-        if ( rc != 0 ) {
-                blkreq->errors++;
-                GOTO(out_desc, rc);
+        if ( rc != 0 )
+                GOTO(out, rc = 1);              /* need to return error count */
+
+        rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
+                                 lustre_swab_ptlbd_rsp);
+        if (rsp == NULL) {
+                CERROR ("can't unpack response\n");
+                GOTO (out, rc = 1);             /* need to return error count */
         }
-        rsp = lustre_msg_buf(req->rq_repmsg, 0);
-        if (rsp->r_status != RSP_OK) {
-                blkreq->errors += rsp->r_error_cnt;
+        else if (rsp->r_status != 0) {
+                rc = rsp->r_error_cnt;
         }
 
-out_desc:
-        ptlrpc_bulk_decref(desc);
-out_req:
+out:
         ptlrpc_req_finished(req);
         RETURN(rc);
 }
 
-static int ptlbd_bulk_timeout(void *data)
+
+int ptlbd_send_flush_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd)
 {
-/*        struct ptlrpc_bulk_desc *desc = data;*/
+        struct obd_import *imp = ptlbd->bd_import;
+        struct ptlbd_op *op;
+        struct ptlbd_rsp *rsp;
+        struct ptlrpc_request *req;
+        int rc, rep_size, size[1];
         ENTRY;
 
-        CERROR("ugh, timed out\n");
+        LASSERT(cmd == PTLBD_FLUSH);
+
+        size[0] = sizeof(struct ptlbd_op);
+
+        req = ptlrpc_prep_req(imp, cmd, 1, size, NULL);
+        if (!req)
+                RETURN(-ENOMEM); 
+
+        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+
+        /* XXX pack */
+        op->op_cmd = cmd;
+        op->op_lun = 0;
+        op->op_niob_cnt = 0;
+        op->op__padding = 0;
+        op->op_block_cnt = 0;
+
+        rep_size = sizeof(struct ptlbd_rsp);
+        req->rq_replen = lustre_msg_size(1, &rep_size);
+
+        /* XXX find out how we're really supposed to manage levels */
+        req->rq_send_state = imp->imp_state;
+
+        rc = ptlrpc_queue_wait(req);
+        if ( rc != 0 )
+                GOTO(out_req, rc = 1);
+        rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
+                                 lustre_swab_ptlbd_rsp);
+        if (rsp->r_status != 0)
+                rc = rsp->r_status;
 
-        RETURN(1);
+out_req:
+        ptlrpc_req_finished(req);
+        RETURN(rc);
 }
 
+
 int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
                 int page_count, struct list_head *page_list)
 {
         mm_segment_t old_fs;
         struct list_head *pos;
-        int status = RSP_OK;
+        int status = 0;
         ENTRY;
 
         old_fs = get_fs();
@@ -155,118 +176,211 @@ int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
                 struct page *page = list_entry(pos, struct page, list);
                 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + 
                         niobs->n_offset;
-                if ( op == PTLBD_READ ) {
-                        if ((ret = filp->f_op->read(filp, page_address(page), 
-                             niobs->n_length, &offset)) != niobs->n_length)
-                                status = ret;
-                                goto out;             
-                } else {
-                        if ((ret = filp->f_op->write(filp, page_address(page), 
-                             niobs->n_length, &offset)) != niobs->n_length)
-                                status = ret;
-                                goto out;             
-                }               
-
+                if ( op == PTLBD_READ )
+                        ret = filp->f_op->read(filp, page_address(page), 
+                             niobs->n_length, &offset);
+                else 
+                        ret = filp->f_op->write(filp, page_address(page), 
+                             niobs->n_length, &offset);
+                if (ret != niobs->n_length) {
+                        status = ret;
+                        break;
+                }
                 niobs++;
         }
-out:
         set_fs(old_fs);
         RETURN(status);
 }
 
-int ptlbd_parse_req(struct ptlrpc_request *req)
+
+int ptlbd_srv_rw_req(ptlbd_cmd_t cmd, __u16 index, 
+                     struct ptlrpc_request *req, int swab)
 {
-        struct ptlbd_op *op;
         struct ptlbd_niob *niob, *niobs;
         struct ptlbd_rsp *rsp;
-        struct ptlrpc_bulk_desc *desc;
-        struct file *filp = req->rq_obd->u.ptlbd.filp;
+        struct ptlrpc_bulk_desc *desc = NULL;
+        struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
         struct l_wait_info lwi;
-        int size[1], wait_flag, i, page_count, rc, error_cnt = 0, 
-            status = RSP_OK;
+        int size[1], i, page_count, rc = 0, error_cnt = 0;
         struct list_head *pos, *n;
+        struct page *page;
         LIST_HEAD(tmp_pages);
         ENTRY;
 
-        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+        niobs = lustre_swab_reqbuf (req, 1, sizeof (*niobs),
+                                    lustre_swab_ptlbd_niob);
+        if (niobs == NULL)
+                GOTO (out, rc = -EFAULT);
+
+        size[0] = sizeof(struct ptlbd_rsp);
+        rc = lustre_pack_reply(req, 1, size, NULL);
         if ( rc )
-                RETURN(rc);
+                GOTO(out, rc);
 
-        op = lustre_msg_buf(req->rq_reqmsg, 0);
-        LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
+        rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rsp));
+        if ( rsp == NULL )
+                GOTO (out, rc = -EFAULT);
 
-        niobs = lustre_msg_buf(req->rq_reqmsg, 1);
+        /* FIXME: assumes each niobuf fits in 1 page */
         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
-
-        desc = ptlrpc_prep_bulk(req->rq_connection);
-        if (desc == NULL)
-                GOTO(out, rc = -ENOMEM);
-        desc->bd_ptl_ev_hdlr = NULL;
+        if (swab) {                             /* swab remaining niobs */
+                for (i = 1; i < page_count; i++)
+                        lustre_swab_ptlbd_niob(&niobs[i]);
+        }
+        if (req->rq_export == NULL) {
+                error_cnt++;
+                GOTO(out_reply, rc = -EFAULT);
+        }
+        
+        if (cmd == PTLBD_READ)
+                desc = ptlrpc_prep_bulk_exp (req, page_count, 
+                                             BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
+        else
+                desc = ptlrpc_prep_bulk_exp (req, page_count,
+                                             BULK_GET_SINK, PTLBD_BULK_PORTAL);
+        if (desc == NULL) {
+                error_cnt++;
+                GOTO(out_reply, rc = -ENOMEM);
+        }
         desc->bd_portal = PTLBD_BULK_PORTAL;
+        LASSERT (page_count > 0);
 
         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
-                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
-                if (bulk == NULL)
-                        GOTO(out_bulk, rc = -ENOMEM);
-
-                bulk->bp_page = alloc_page(GFP_KERNEL);
-                if (bulk->bp_page == NULL)
-                        GOTO(out_bulk, rc = -ENOMEM);
-                list_add(&bulk->bp_page->list, &tmp_pages);
-
-                bulk->bp_xid = niob->n_xid;
-                bulk->bp_buf = page_address(bulk->bp_page);
-                bulk->bp_buflen = niob->n_length;
+                page = alloc_page(GFP_KERNEL);
+                if (page == NULL) {
+                        error_cnt++;
+                        GOTO(out_reply, rc = -ENOMEM);
+                }
+                list_add_tail(&page->list, &tmp_pages);
+
+                ptlrpc_prep_bulk_page(desc, page,
+                                      niob->n_offset & (PAGE_SIZE - 1),
+                                      niob->n_length);
         }
 
-        if ( op->op_cmd == PTLBD_READ ) {
-                if ((status = ptlbd_do_filp(filp, PTLBD_READ, niobs, 
-                                          page_count, &tmp_pages)) < 0) {
+        if ( cmd == PTLBD_READ ) {
+                rc = ptlbd_do_filp(filp, PTLBD_READ, niobs, 
+                                   page_count, &tmp_pages);
+                if (rc < 0) {
                         error_cnt++;
+                        GOTO(out_reply, rc);
                 }
-                rc = ptlrpc_bulk_put(desc);
-                wait_flag = PTL_BULK_FL_SENT;
-        } else {
-                rc = ptlrpc_bulk_get(desc);
-                wait_flag = PTL_BULK_FL_RCVD;
         }
+        rc = ptlrpc_start_bulk_transfer(desc);
 
-        if ( rc )
-                GOTO(out_bulk, rc);
-
-        /* this synchronization probably isn't good enough */
-        lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
-        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
+        if ( rc ) {
+                error_cnt++;
+                GOTO(out_reply, rc);
+        }
 
-        size[0] = sizeof(struct ptlbd_rsp);
-        rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
-        if ( rc )
-                GOTO(out, rc);
+        lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, desc);
+        rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
+        if (rc != 0) {
+                LASSERT(rc == -ETIMEDOUT);
+                ptlrpc_abort_bulk(desc);
+                error_cnt++;
+                GOTO(out_reply, rc);
+        }
 
-        rsp = lustre_msg_buf(req->rq_repmsg, 0);
-        if ( rsp == NULL )
-                GOTO(out, rc = -EINVAL);
+        /* XXX do some error handling */
+        LASSERT(desc->bd_success && desc->bd_nob_transferred == desc->bd_nob);
         
-        if ( op->op_cmd == PTLBD_WRITE ) {
-                if ((status = ptlbd_do_filp(filp, PTLBD_WRITE, niobs, 
+        if ( cmd == PTLBD_WRITE ) {
+                if ((rc = ptlbd_do_filp(filp, PTLBD_WRITE, niobs, 
                                            page_count, &tmp_pages)) < 0) {
                         error_cnt++;
                 }
         }
 
+out_reply:
         rsp->r_error_cnt = error_cnt;
-        rsp->r_status = status;                         /* I/O status */
-        req->rq_status = RQ_OK ; /* XXX */              /* ptlbd req status */
+        rsp->r_status = rc;  
+        req->rq_status = rc; 
 
-        ptlrpc_reply(req->rq_svc, req);
+        ptlrpc_reply(req);
 
-out_bulk:
         list_for_each_safe(pos, n, &tmp_pages) {
                 struct page *page = list_entry(pos, struct page, list);
                 list_del(&page->list);
                 __free_page(page);
         }
-        ptlrpc_bulk_decref(desc);
+        if (desc)
+                ptlrpc_free_bulk(desc);
 out:
         RETURN(rc);
 }
+
+
+int ptlbd_srv_flush_req(ptlbd_cmd_t cmd, __u16 index, 
+                        struct ptlrpc_request *req)
+{
+        struct ptlbd_rsp *rsp;
+        struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
+        int size[1], rc, status;
+        ENTRY;
+
+        size[0] = sizeof(struct ptlbd_rsp);
+        rc = lustre_pack_reply(req, 1, size, NULL);
+        if ( rc )
+                RETURN(rc);
+
+        rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rsp));
+        if ( rsp == NULL )
+                RETURN(-EINVAL);
+
+        if (! (filp) && (filp->f_op) && (filp->f_op->fsync) &&
+              (filp->f_dentry))
+                GOTO(out_reply, status = -EINVAL);
+
+        status = filp->f_op->fsync(filp, filp->f_dentry, 1);
+
+out_reply:
+        rsp->r_error_cnt = 0;
+        rsp->r_status = status;
+        req->rq_status = 0;
+
+        ptlrpc_reply(req);
+        RETURN(0);
+}
+
+
+int ptlbd_handle(struct ptlrpc_request *req)
+{
+        struct ptlbd_op *op;
+        int swab;
+        int rc;
+        ENTRY;
+
+        swab = lustre_msg_swabbed (req->rq_reqmsg);
+
+        if (req->rq_reqmsg->opc == PTLBD_CONNECT) {
+                rc = target_handle_connect(req, ptlbd_handle);
+                target_send_reply(req, rc, OBD_FAIL_PTLRPC);
+                RETURN(0);
+        }
+        if (req->rq_reqmsg->opc == PTLBD_DISCONNECT) {
+                rc = target_handle_disconnect(req);
+                target_send_reply(req, rc, OBD_FAIL_PTLRPC);
+                RETURN(0);
+        }
+        op = lustre_swab_reqbuf (req, 0, sizeof (*op),
+                                 lustre_swab_ptlbd_op);
+        if (op == NULL)
+                RETURN(-EFAULT);
+
+        switch (op->op_cmd) {
+                case PTLBD_READ:
+                case PTLBD_WRITE:
+                        rc = ptlbd_srv_rw_req(op->op_cmd, op->op_lun, req, 
+                                              swab);
+                        break;
+
+                case PTLBD_FLUSH:
+                        rc = ptlbd_srv_flush_req(op->op_cmd, op->op_lun, req);
+                        break;
+                default:
+                        rc = -EINVAL;
+        }
+
+        RETURN(rc);
+}