Whamcloud - gitweb
* Split struct niobuf into niobuf_local and niobuf_remote
authorpschwan <pschwan>
Fri, 17 May 2002 16:18:11 +0000 (16:18 +0000)
committerpschwan <pschwan>
Fri, 17 May 2002 16:18:11 +0000 (16:18 +0000)
  - niobuf_remote is offset, length, xid, and flags
  - niobuf_local is all of the above, plus an address and sometimes a page
  - The former is sent across the network, the latter used internally

* Small ldlm fixes brought over from the (now-defunct) ldlm_testing branch
  - SMP deadlock fix
  - comment fix

* Bulk descriptor refactoring
  - You create a bulk descriptor and then n bulk pages that get hooked in
  - Pages sent all at once, optional callback per page
  - Another optional callback when the final ack has been received, although
    Eric tells me that elan doesn't guarantee packet ordering, so this needs
    revisited

* A few key bugfixes in the MDC/MDS/OSC/OST bulk code; these probably bit us if
  we sent it a signal during bulk processing

* A few LOV pieces (mostly in genops.c)
  - A temporary gen_multi_setup/cleanup to get the LOV rolling; it won't remain
    in this form

I've tested these fixes, but not exhaustively.

23 files changed:
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_ext2.h
lustre/include/linux/obd_ost.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_request.c
lustre/lib/obd_pack.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/obdclass/genops.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/tests/llmount.sh

index 3e6478a..a6783e9 100644 (file)
 
 #ifndef __LUSTRE_IDL_H__
 #define __LUSTRE_IDL_H__
-#ifdef __KERNEL__
-#include <linux/ioctl.h>
-#include <asm/types.h>
 
-#include <linux/types.h>
+#ifdef __KERNEL__
+# include <linux/ioctl.h>
+# include <asm/types.h>
+# include <linux/types.h>
 #else
-#define __KERNEL__
-#include <linux/list.h>
-#undef __KERNEL__
-#include <stdint.h>
+# define __KERNEL__
+# include <linux/list.h>
+# undef __KERNEL__
+# include <stdint.h>
 #endif
 /*
  * this file contains all data structures used in Lustre interfaces:
@@ -61,6 +61,21 @@ struct lustre_msg {
         __u32   buflens[0];
 };
 
+struct niobuf_remote {
+        __u64 offset;
+        __u32 len;
+        __u32 xid;
+        __u32 flags;
+};
+
+struct niobuf_local {
+        __u64 addr;
+        __u64 offset;
+        __u32 len;
+        __u32 xid;
+        void *page;
+};
+
 /*
  *   OST requests: OBDO & OBD request records
  */
@@ -186,15 +201,6 @@ struct ll_fid {
         __u32 f_type;
 };
 
-struct niobuf {
-        __u64 addr;
-        __u64 offset;
-        __u32 len;
-        __u32 flags;
-        __u32 xid;
-        void *page;
-};
-
 struct mds_body {
         struct ll_fid  fid1;
         struct ll_fid  fid2;
index bd5f574..cdd4683 100644 (file)
@@ -176,24 +176,33 @@ struct ptlrpc_request {
         struct ptlrpc_client *rq_client;
 };
 
-struct ptlrpc_bulk_desc {
-        int b_flags;
-        struct ptlrpc_connection *b_connection;
-        __u32 b_portal;
+struct ptlrpc_bulk_page {
+        struct ptlrpc_bulk_desc *b_desc;
+        struct list_head b_link;
         char *b_buf;
         int b_buflen;
-        int (*b_cb)(struct ptlrpc_bulk_desc *, void *);
         struct page *b_page;
-        struct obd_conn b_conn;
         __u32 b_xid;
-
-        wait_queue_head_t b_waitq;
+        int (*b_cb)(struct ptlrpc_bulk_page *);
 
         ptl_md_t b_md;
         ptl_handle_md_t b_md_h;
         ptl_handle_me_t b_me_h;
 };
 
+struct ptlrpc_bulk_desc {
+        int b_flags;
+        struct ptlrpc_connection *b_connection;
+        __u32 b_portal;
+        int (*b_cb)(struct ptlrpc_bulk_desc *);
+        struct obd_conn b_conn;
+
+        wait_queue_head_t b_waitq;
+        struct list_head b_page_list;
+        __u32 b_page_count;
+        __u32 b_finished_count;
+};
+
 struct ptlrpc_thread {
         struct list_head t_link;
 
@@ -245,8 +254,7 @@ void ptlrpc_init_connection(void);
 void ptlrpc_cleanup_connection(void);
 
 /* rpc/niobuf.c */
-int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *);
-int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal);
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *);
 int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
@@ -272,10 +280,12 @@ void ptlrpc_restart_req(struct ptlrpc_request *req);
 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
                                        struct ptlrpc_connection *u, int opcode,
                                        int count, int *lengths, char **bufs);
-void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
 void ptlrpc_free_req(struct ptlrpc_request *request);
 void ptlrpc_req_finished(struct ptlrpc_request *request);
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *);
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
+struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc);
+void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page);
 int ptlrpc_check_status(struct ptlrpc_request *req, int err);
 
 /* rpc/service.c */
index c98e785..8640d95 100644 (file)
@@ -156,6 +156,11 @@ struct osc_obd {
         struct ptlrpc_connection *osc_conn;
 };
 
+struct lov_obd {
+        int lov_count;
+        struct obd_conn *lov_targets;
+};
+
 /* corresponds to one of the obd's */
 #define MAX_MULTI       16
 struct obd_device {
@@ -186,6 +191,7 @@ struct obd_device {
                 struct echo_obd echo;
                 struct recovd_obd recovd;
                 struct trace_obd trace;
+                struct lov_obd lov;
 #if 0
                 struct raid1_obd raid1;
                 struct snap_obd snap;
@@ -236,11 +242,11 @@ struct obd_ops {
                          obd_id *startid, obd_gr group, void *data);
         int (*o_preprw)(int cmd, struct obd_conn *conn,
                         int objcount, struct obd_ioobj *obj,
-                        int niocount, struct niobuf *nb,
-                        struct niobuf *res);
+                        int niocount, struct niobuf_remote *remote,
+                        struct niobuf_local *local);
         int (*o_commitrw)(int cmd, struct obd_conn *conn,
                           int objcount, struct obd_ioobj *obj,
-                          int niocount, struct niobuf *res);
+                          int niocount, struct niobuf_local *local);
         int (*o_enqueue)(struct obd_conn *conn, struct ldlm_namespace *ns,
                          struct ldlm_handle *parent_lock, __u64 *res_id,
                          __u32 type, struct ldlm_extent *, __u32 mode,
index a428858..8c6b65d 100644 (file)
@@ -308,28 +308,28 @@ static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa,
 
 static inline int obd_preprw(int cmd, struct obd_conn *conn,
                              int objcount, struct obd_ioobj *obj,
-                             int niocount, struct niobuf *nb,
-                             struct niobuf *res)
+                             int niocount, struct niobuf_remote *remote,
+                             struct niobuf_local *local)
 {
         int rc;
         OBD_CHECK_SETUP(conn);
         OBD_CHECK_OP(conn, preprw);
 
-        rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount, nb,
-                                       res);
+        rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount,
+                                       remote, local);
         RETURN(rc);
 }
 
 static inline int obd_commitrw(int cmd, struct obd_conn *conn,
                                int objcount, struct obd_ioobj *obj,
-                               int niocount, struct niobuf *res)
+                               int niocount, struct niobuf_local *local)
 {
         int rc;
         OBD_CHECK_SETUP(conn);
         OBD_CHECK_OP(conn, commitrw);
 
         rc = OBP(conn->oc_dev, commitrw)(cmd, conn, objcount, obj, niocount,
-                                         res);
+                                         local);
         RETURN(rc);
 }
 
index 84e015e..73b4b0b 100644 (file)
@@ -20,7 +20,7 @@ extern struct file_operations *obd_fso;
 /* ext2_obd.c */
 extern struct obd_ops ext2_obd_ops;
 
-
+#include <linux/fs.h>
 #include <linux/ext2_fs.h>
 
 /* super.c */
index e1c8f92..8ad6e6a 100644 (file)
@@ -33,9 +33,9 @@
 #define LUSTRE_OSC_NAME "osc"
 
 /* ost/ost_pack.c */
-void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len, 
-                     __u32 flags, __u32 xid);
-void ost_unpack_niobuf(void **tmp, struct niobuf **nbp);
+void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
+                     __u32 xid);
+void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp);
 void ost_pack_ioo(void **tmp, struct obdo *oa, int bufcnt);
 void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop);
 
index 05bd152..512836d 100644 (file)
@@ -276,7 +276,8 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
         return rc;
 }
 
-/* Must be called without the resource lock held. */
+/* Must be called without the resource lock held.  Returns a referenced,
+ * unlocked ldlm_lock. */
 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
                                     struct ldlm_handle *parent_lock_handle,
                                     __u64 *res_id, __u32 type,
index eaee614..ec32a68 100644 (file)
@@ -42,7 +42,6 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
 
         lock = ldlm_handle2object(lockh);
 
-        spin_unlock(&lock->l_lock);
         req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
index 5b8ad4e..b4f0254 100644 (file)
@@ -52,13 +52,12 @@ void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop)
         *tmp = c + sizeof(*ioo); 
 }
 
-void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len, 
-                     __u32 flags, __u32 xid)
+void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
+                     __u32 xid)
 {
-        struct niobuf *nb = *tmp;
+        struct niobuf_remote *nb = *tmp;
         char *c = *tmp;
 
-        nb->addr = HTON__u64((__u64)(unsigned long)addr); 
         nb->offset = HTON__u64(offset); 
         nb->len = HTON__u32(len); 
         nb->flags = HTON__u32(flags); 
@@ -66,14 +65,13 @@ void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len,
         *tmp = c + sizeof(*nb); 
 }
 
-void ost_unpack_niobuf(void **tmp, struct niobuf **nbp)
+void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp)
 {
         char *c = *tmp;
-        struct niobuf *nb = *tmp;
+        struct niobuf_remote *nb = *tmp;
 
         *nbp = *tmp;
 
-        nb->addr = NTOH__u64(nb->addr); 
         nb->offset = NTOH__u64(nb->offset); 
         nb->len = NTOH__u32(nb->len); 
         nb->flags = NTOH__u32(nb->flags); 
index 8026454..16d29e3 100644 (file)
@@ -166,18 +166,15 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
                             loff_t *ppos)
 {
         struct inode *inode = filp->f_dentry->d_inode;
-#if 0
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ldlm_extent extent;
         struct ldlm_handle lockh;
         __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
         int flags = 0;
         ldlm_error_t err;
-#endif
         ssize_t retval;
         ENTRY;
 
-#if 0
         extent.start = *ppos;
         extent.end = *ppos + count;
         CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
@@ -189,7 +186,6 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         if (err != ELDLM_OK)
                 CERROR("lock enqueue: err: %d\n", err);
         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
-#endif
 
         CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n",
                inode->i_ino, count, *ppos);
@@ -201,11 +197,9 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
                 ll_setattr(filp->f_dentry, &attr);
         }
 
-#if 0
         err = obd_cancel(&sbi->ll_conn, LCK_PR, &lockh);
         if (err != ELDLM_OK)
                 CERROR("lock cancel: err: %d\n", err);
-#endif
 
         RETURN(retval);
 }
@@ -217,18 +211,15 @@ static ssize_t
 ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
 {
         struct inode *inode = file->f_dentry->d_inode;
-#if 0
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ldlm_extent extent;
         struct ldlm_handle lockh;
         __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
         int flags = 0;
         ldlm_error_t err;
-#endif
         ssize_t retval;
         ENTRY;
 
-#if 0
         extent.start = *ppos;
         extent.end = *ppos + count;
         CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
@@ -240,19 +231,15 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         if (err != ELDLM_OK)
                 CERROR("lock enqueue: err: %d\n", err);
         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
-#endif
 
         CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n",
                inode->i_ino, (long)count, *ppos);
 
         retval = generic_file_write(file, buf, count, ppos);
 
-
-#if 0
         err = obd_cancel(&sbi->ll_conn, LCK_PW, &lockh);
         if (err != ELDLM_OK)
                 CERROR("lock cancel: err: %d\n", err);
-#endif
 
         RETURN(retval);
 }
index 27650f8..e6e1307 100644 (file)
@@ -180,7 +180,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
  out:
         ptlrpc_free_req(request);
         return inode;
-} /* ll_new_inode */
+}
 
 int ll_mdc_unlink(struct inode *dir, struct inode *child,
                   const char *name, int len)
index 842f54a..59b893a 100644 (file)
@@ -104,10 +104,11 @@ int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         level = LUSTRE_CONN_FULL;
  resend:
         rc = mdc_reint(cl, req, level);
-        if (rc == -ERESTARTSYS ) { 
-                struct mds_update_record_hdr *hdr = lustre_msg_buf(req->rq_reqmsg, 0);
+        if (rc == -ERESTARTSYS) {
+                struct mds_update_record_hdr *hdr =
+                        lustre_msg_buf(req->rq_reqmsg, 0);
                 level = LUSTRE_CONN_RECOVD;
-                CERROR("Lost reply: re-create rep.\n"); 
+                CERROR("Lost reply: re-create rep.\n");
                 req->rq_flags = 0;
                 hdr->ur_opcode = NTOH__u32(REINT_RECREATE);
                 goto resend;
index 8c9eb8c..0e906a4 100644 (file)
@@ -181,36 +181,32 @@ int mdc_readpage(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                  struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req = NULL;
-        struct ptlrpc_bulk_desc *bulk = NULL;
-        struct niobuf niobuf;
+        struct ptlrpc_bulk_desc *desc = NULL;
+        struct ptlrpc_bulk_page *bulk = NULL;
         struct mds_body *body;
-        int rc, size[2] = {sizeof(*body), sizeof(struct niobuf)};
-        char *bufs[2] = {NULL, (char *)&niobuf};
-
-        niobuf.addr = (__u64) (long) addr;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
         CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
 
-        bulk = ptlrpc_prep_bulk(conn);
-        if (bulk == NULL) {
-                CERROR("%s: cannot init bulk desc\n", __FUNCTION__);
-                rc = -ENOMEM;
-                goto out;
-        }
+        desc = ptlrpc_prep_bulk(conn);
+        if (desc == NULL)
+                GOTO(out, rc = -ENOMEM);
 
-        req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 2, size, bufs);
+        req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 1, &size, NULL);
         if (!req)
-                GOTO(out, rc = -ENOMEM);
+                GOTO(out2, rc = -ENOMEM);
 
+        bulk = ptlrpc_prep_bulk_page(desc);
         bulk->b_buflen = PAGE_SIZE;
-        bulk->b_buf = (void *)(long)niobuf.addr;
-        bulk->b_portal = MDS_BULK_PORTAL;
+        bulk->b_buf = addr;
         bulk->b_xid = req->rq_reqmsg->xid;
+        desc->b_portal = MDS_BULK_PORTAL;
 
-        rc = ptlrpc_register_bulk(bulk);
+        rc = ptlrpc_register_bulk(desc);
         if (rc) {
                 CERROR("couldn't setup bulk sink: error %d.\n", rc);
-                GOTO(out, rc);
+                GOTO(out2, rc);
         }
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
@@ -218,21 +214,20 @@ int mdc_readpage(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         body->fid1.f_type = type;
         body->size = offset;
 
-        req->rq_replen = lustre_msg_size(1, size);
+        req->rq_replen = lustre_msg_size(1, &size);
         req->rq_level = LUSTRE_CONN_FULL;
         rc = ptlrpc_queue_wait(req);
         if (rc) {
                 CERROR("error in handling %d\n", rc);
-                ptlrpc_abort_bulk(bulk);
-                GOTO(out, rc);
-        }
+                ptlrpc_abort_bulk(desc);
+        } else
+                mds_unpack_rep_body(req);
 
-        mds_unpack_rep_body(req);
         EXIT;
-
+ out2:
+        ptlrpc_free_bulk(desc);
  out:
         *request = req;
-        ptlrpc_free_bulk(bulk);
         return rc;
 }
 
index d4eef8a..e22bbee 100644 (file)
 #include <linux/module.h>
 #include <linux/lustre_mds.h>
 
-int mds_sendpage(struct ptlrpc_request *req, struct file *file,
-                 __u64 offset, struct niobuf *dst)
+int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
 {
         int rc = 0;
         mm_segment_t oldfs = get_fs();
-        struct ptlrpc_bulk_desc *bulk;
+        struct ptlrpc_bulk_desc *desc;
+        struct ptlrpc_bulk_page *bulk;
         char *buf;
+        ENTRY;
 
-        bulk = ptlrpc_prep_bulk(req->rq_connection);
-        if (bulk == NULL) {
-                rc = -ENOMEM;
-                GOTO(out, rc);
-        }
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(out, rc = -ENOMEM);
 
-        bulk->b_xid = req->rq_reqmsg->xid;
+        bulk = ptlrpc_prep_bulk_page(desc);
+        if (bulk == NULL)
+                GOTO(cleanup_bulk, rc = -ENOMEM);
 
         OBD_ALLOC(buf, PAGE_SIZE);
-        if (!buf) {
-                rc = -ENOMEM;
-                GOTO(cleanup_bulk, rc);
-        }
+        if (buf == NULL)
+                GOTO(cleanup_bulk, rc = -ENOMEM);
 
         set_fs(KERNEL_DS);
         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
                              (loff_t *)&offset);
         set_fs(oldfs);
 
-        if (rc != PAGE_SIZE) {
-                rc = -EIO;
-                GOTO(cleanup_buf, rc);
-        }
+        if (rc != PAGE_SIZE)
+                GOTO(cleanup_buf, rc = -EIO);
 
+        bulk->b_xid = req->rq_reqmsg->xid;
         bulk->b_buf = buf;
         bulk->b_buflen = PAGE_SIZE;
+        desc->b_portal = MDS_BULK_PORTAL;
 
-        rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
+        rc = ptlrpc_send_bulk(desc);
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
                        OBD_FAIL_MDS_SENDPAGE, rc);
-                PtlMDUnlink(bulk->b_md_h);
-                GOTO(cleanup_buf, rc);
-        }
-        wait_event_interruptible(bulk->b_waitq,
-                                 ptlrpc_check_bulk_sent(bulk));
-
-        if (bulk->b_flags & PTL_RPC_FL_INTR) {
-                rc = -EINTR;
+                ptlrpc_abort_bulk(desc);
                 GOTO(cleanup_buf, rc);
         }
 
@@ -76,7 +68,7 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file,
  cleanup_buf:
         OBD_FREE(buf, PAGE_SIZE);
  cleanup_bulk:
-        ptlrpc_free_bulk(bulk);
+        ptlrpc_free_bulk(desc);
  out:
         return rc;
 }
@@ -462,7 +454,6 @@ int mds_readpage(struct ptlrpc_request *req)
         struct vfsmount *mnt;
         struct dentry *de;
         struct file *file;
-        struct niobuf *niobuf;
         struct mds_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -490,17 +481,10 @@ int mds_readpage(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
-        if (!niobuf) {
-                req->rq_status = -EINVAL;
-                LBUG();
-                RETURN(0);
-        }
-
         /* to make this asynchronous make sure that the handling function
            doesn't send a reply when this function completes. Instead a
            callback function would send the reply */
-        rc = mds_sendpage(req, file, body->size, niobuf);
+        rc = mds_sendpage(req, file, body->size);
 
         filp_close(file, 0);
         req->rq_status = rc;
index 51336ff..8ef7600 100644 (file)
  *
  */
 
-#include <linux/mm.h>
-#include <linux/pagemap.h>
-#include <linux/fs.h>
-#include <linux/sched.h>
-#include <asm/uaccess.h>
-
 #define DEBUG_SUBSYSTEM S_CLASS
 
-#include <linux/obd_support.h>
 #include <linux/obd_class.h>
 
 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
@@ -33,17 +26,14 @@ int obd_init_obdo_cache(void)
                                                 sizeof(struct obdo),
                                                 0, SLAB_HWCACHE_ALIGN,
                                                 NULL, NULL);
-                if (obdo_cachep == NULL) {
-                        EXIT;
-                        return -ENOMEM;
-                } else {
+                if (obdo_cachep == NULL)
+                        RETURN(-ENOMEM);
+                else
                         CDEBUG(D_CACHE, "allocated cache at %p\n", obdo_cachep);
-                }
         } else {
                 CDEBUG(D_CACHE, "using existing cache at %p\n", obdo_cachep);
         }
-        EXIT;
-        return 0;
+        RETURN(0);
 }
 
 void obd_cleanup_obdo_cache(void)
@@ -75,16 +65,16 @@ struct obd_client *gen_client(const struct obd_conn *conn)
         lh = next = &obddev->obd_gen_clients;
         while ((lh = lh->next) != &obddev->obd_gen_clients) {
                 cli = list_entry(lh, struct obd_client, cli_chain);
-                
+
                 if (cli->cli_id == conn->oc_id)
                         return cli;
         }
 
         return NULL;
-} /* obd_client */
+} /* gen_client */
 
 
-/* a connection defines a context in which preallocation can be managed. */ 
+/* a connection defines a context in which preallocation can be managed. */
 int gen_connect (struct obd_conn *conn)
 {
         struct obd_client * cli;
@@ -105,7 +95,7 @@ int gen_connect (struct obd_conn *conn)
         CDEBUG(D_INFO, "connect: new ID %u\n", cli->cli_id);
         conn->oc_id = cli->cli_id;
         return 0;
-} /* gen_obd_connect */
+} /* gen_connect */
 
 
 int gen_disconnect(struct obd_conn *conn)
@@ -116,7 +106,7 @@ int gen_disconnect(struct obd_conn *conn)
         if (!(cli = gen_client(conn))) {
                 CDEBUG(D_IOCTL, "disconnect: attempting to free "
                        "nonexistent client %u\n", conn->oc_id);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
 
@@ -125,58 +115,49 @@ int gen_disconnect(struct obd_conn *conn)
 
         CDEBUG(D_INFO, "disconnect: ID %u\n", conn->oc_id);
 
-        EXIT;
-        return 0;
+        RETURN(0);
 } /* gen_obd_disconnect */
 
-
-/* 
- *   raid1 defines a number of connections to child devices,
- *   used to make calls to these devices.
- *   data holds nothing
- */ 
+/* FIXME: Data is a space- or comma-separated list of device IDs.  This will
+ * have to change. */
 int gen_multi_setup(struct obd_device *obddev, uint32_t len, void *data)
 {
-        int i;
+        int count, rc;
+        char *p;
+        ENTRY;
 
-        for (i = 0 ; i < obddev->obd_multi_count ; i++ ) {
-                int rc;
-                struct obd_conn *ch_conn = &obddev->obd_multi_conn[i];
-                rc  = OBP(ch_conn->oc_dev, connect)(ch_conn);
+        for (p = data, count = 0; p < (char *)data + len; count++) {
+                char *end;
+                int tmp = simple_strtoul(p, &end, 0);
 
-                if ( rc != 0 ) {
-                        int j;
+                if (p == end) {
+                        CERROR("invalid device ID starting at: %s\n", p);
+                        GOTO(err_disconnect, rc = -EINVAL);
+                }
 
-                        for (j = --i; j >= 0; --j) {
-                                ch_conn = &obddev->obd_multi_conn[i];
-                                OBP(ch_conn->oc_dev, disconnect)(ch_conn);
-                        }
-                        return -EINVAL;
+                obddev->obd_multi_conn[count].oc_dev = &obd_dev[tmp];
+                rc = obd_connect(&obddev->obd_multi_conn[count]);
+                if (rc) {
+                        CERROR("cannot connect to device %d: rc = %d\n", tmp,
+                               rc);
+                        GOTO(err_disconnect, rc);
                 }
-        }               
-        return 0;
-}
 
+                CDEBUG(D_INFO, "target OBD %d is of type %s\n", count,
+                       obd_dev[tmp].obd_type->typ_name);
 
-#if 0
-int gen_multi_attach(struct obd_device *obddev, int len, void *data)
-{
-        int i;
-        int count;
-        struct obd_device *rdev = obddev->obd_multi_dev[0];
+                p = end + 1;
+        }
 
-        count = len/sizeof(int);
         obddev->obd_multi_count = count;
-        for (i=0 ; i<count ; i++) {
-                rdev = &obd_dev[*((int *)data + i)];
-                rdev = rdev + 1;
-                CDEBUG(D_INFO, "OBD RAID1: replicator %d is of type %s\n", i,
-                       (rdev + i)->obd_type->typ_name);
-        }
-        return 0;
-}
-#endif
 
+        RETURN(0);
+
+ err_disconnect:
+        for (count--; count >= 0; count--)
+                obd_disconnect(&obddev->obd_multi_conn[count]);
+        return rc;
+}
 
 /*
  *    remove all connections to this device
@@ -187,18 +168,14 @@ int gen_multi_cleanup(struct obd_device *obddev)
 {
         int i;
 
-        for (i = 0 ; i < obddev->obd_multi_count ; i++ ) {
-                struct obd_conn *ch_conn = &obddev->obd_multi_conn[i];
-                int rc;
-                rc  = OBP(ch_conn->oc_dev, disconnect)(ch_conn);
-
-                if ( rc != 0 ) {
+        for (i = 0; i < obddev->obd_multi_count; i++) {
+                int rc = obd_disconnect(&obddev->obd_multi_conn[i]);
+                if (rc)
                         CERROR("disconnect failure %d\n",
-                               ch_conn->oc_dev->obd_minor);
-                }
-        }               
+                               obddev->obd_multi_conn[i].oc_dev->obd_minor);
+        }
         return 0;
-} /* gen_multi_cleanup_device */
+}
 
 
 /*
@@ -254,17 +231,15 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst,
         int err = 0;
 
         ENTRY;
-        CDEBUG(D_INFO, "src: ino %Ld blocks %Ld, size %Ld, dst: ino %Ld\n", 
+        CDEBUG(D_INFO, "src: ino %Ld blocks %Ld, size %Ld, dst: ino %Ld\n",
                (unsigned long long)src->o_id, (unsigned long long)src->o_blocks,
                (unsigned long long)src->o_size, (unsigned long long)dst->o_id);
         page = alloc_page(GFP_USER);
-        if ( !page ) {
-                EXIT;
-                return -ENOMEM;
-        }
-        
+        if (page == NULL)
+                RETURN(-ENOMEM);
+
         lck_page(page);
-        
+
         /* XXX with brw vector I/O, we could batch up reads and writes here,
          *     all we need to do is allocate multiple pages to handle the I/Os
          *     and arrays to handle the request parameters.
@@ -276,7 +251,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst,
                 obd_off          brw_offset = (page->index) << PAGE_SHIFT;
                 obd_flag         flagr = 0;
                 obd_flag         flagw = OBD_BRW_CREATE;
-                
+
                 page->index = index;
                 err = OBP(src_conn->oc_dev, brw)(READ, src_conn, num_oa, &src,
                                                  &num_buf, &page, &brw_count,
@@ -299,7 +274,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst,
                 }
 
                 CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
-                
+
                 index++;
         }
         dst->o_size = src->o_size;
@@ -308,6 +283,5 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst,
         UnlockPage(page);
         __free_page(page);
 
-        EXIT;
-        return err;
+        RETURN(err);
 }
index 1b3dc02..52e5a18 100644 (file)
@@ -66,10 +66,10 @@ static int echo_getattr(struct obd_conn *conn, struct obdo *oa)
 }
 
 int echo_preprw(int cmd, struct obd_conn *conn, int objcount,
-                struct obd_ioobj *obj, int niocount, struct niobuf *nb,
-                struct niobuf *res)
+                struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
+                struct niobuf_local *res)
 {
-        struct niobuf *r = res;
+        struct niobuf_local *r = res;
         int rc = 0;
         int i;
 
@@ -91,9 +91,7 @@ int echo_preprw(int cmd, struct obd_conn *conn, int objcount,
                                 CERROR("can't get new page %d/%d for id %Ld\n",
                                        j, obj->ioo_bufcnt,
                                        (unsigned long long)obj->ioo_id);
-                                rc = -ENOMEM;
-                                EXIT;
-                                goto preprw_cleanup;
+                                GOTO(preprw_cleanup, rc = -ENOMEM);
                         }
                         echo_pages++;
 
@@ -117,8 +115,7 @@ int echo_preprw(int cmd, struct obd_conn *conn, int objcount,
         }
         CDEBUG(D_PAGE, "%ld pages allocated after prep\n", echo_pages);
 
-        EXIT;
-        return 0;
+        RETURN(0);
 
 preprw_cleanup:
         /* It is possible that we would rather handle errors by  allow
@@ -139,12 +136,11 @@ preprw_cleanup:
 }
 
 int echo_commitrw(int cmd, struct obd_conn *conn, int objcount,
-                  struct obd_ioobj *obj, int niocount, struct niobuf *res)
+                  struct obd_ioobj *obj, int niocount, struct niobuf_local *res)
 {
-        struct niobuf *r = res;
+        struct niobuf_local *r = res;
         int rc = 0;
         int i;
-
         ENTRY;
 
         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
@@ -152,8 +148,7 @@ int echo_commitrw(int cmd, struct obd_conn *conn, int objcount,
 
         if (niocount && !r) {
                 CERROR("NULL res niobuf with niocount %d\n", niocount);
-                EXIT;
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
         for (i = 0; i < objcount; i++, obj++) {
@@ -167,9 +162,7 @@ int echo_commitrw(int cmd, struct obd_conn *conn, int objcount,
                                 CERROR("bad page %p, id %Ld (%d), buf %d/%d\n",
                                        page, (unsigned long long)obj->ioo_id, i,
                                        j, obj->ioo_bufcnt);
-                                rc = -EFAULT;
-                                EXIT;
-                                goto commitrw_cleanup;
+                                GOTO(commitrw_cleanup, rc = -EFAULT);
                         }
 
                         free_pages(addr, 0);
@@ -177,8 +170,7 @@ int echo_commitrw(int cmd, struct obd_conn *conn, int objcount,
                 }
         }
         CDEBUG(D_PAGE, "%ld pages remain after commit\n", echo_pages);
-        EXIT;
-        return 0;
+        RETURN(0);
 
 commitrw_cleanup:
         CERROR("cleaning up %ld pages (%d obdos)\n",
index 063466b..24ce273 100644 (file)
  */
 
 #define EXPORT_SYMTAB
-
-#include <linux/version.h>
-#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/ext2_fs.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-
 #define DEBUG_SUBSYSTEM S_FILTER
 
-#include <linux/obd_class.h>
+#include <linux/module.h>
 #include <linux/obd_ext2.h>
 #include <linux/obd_filter.h>
 
@@ -36,11 +26,11 @@ long filter_memory;
 #define S_SHIFT 12
 static char * obd_type_by_mode[S_IFMT >> S_SHIFT] = {
         [0]                     "",
-        [S_IFREG >> S_SHIFT]    "R", 
+        [S_IFREG >> S_SHIFT]    "R",
         [S_IFDIR >> S_SHIFT]    "D",
         [S_IFCHR >> S_SHIFT]    "C",
-        [S_IFBLK >> S_SHIFT]    "B", 
-        [S_IFIFO >> S_SHIFT]    "F", 
+        [S_IFBLK >> S_SHIFT]    "B",
+        [S_IFIFO >> S_SHIFT]    "F",
         [S_IFSOCK >> S_SHIFT]   "S",
         [S_IFLNK >> S_SHIFT]    "L"
 };
@@ -85,7 +75,7 @@ static int filter_prep(struct obd_device *obddev)
         filter_id(rootid, FILTER_ROOTINO, S_IFDIR);
         file = filp_open(rootid, O_RDWR | O_CREAT, 00755);
         if (IS_ERR(file)) {
-                CERROR("OBD filter: cannot make root directory"); 
+                CERROR("OBD filter: cannot make root directory");
                 GOTO(out, rc = PTR_ERR(file));
         }
         filp_close(file, 0);
@@ -121,7 +111,7 @@ static int filter_prep(struct obd_device *obddev)
                 }
         }
         obddev->u.filter.fo_lastino = lastino;
-        filp_close(file, 0); 
+        filp_close(file, 0);
 
         rc = 0;
  out:
@@ -136,22 +126,22 @@ static void filter_post(struct obd_device *obddev)
         struct obd_run_ctxt saved;
         long rc;
         struct file *file;
-        loff_t off = 0; 
+        loff_t off = 0;
 
         push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
         file = filp_open("D/status", O_RDWR | O_CREAT, 0700);
-        if ( !file || IS_ERR(file)) { 
+        if ( !file || IS_ERR(file)) {
                 CERROR("OBD filter: cannot create status file\n");
                 goto out;
         }
-        rc = file->f_op->write(file, (char *)&obddev->u.filter.fo_lastino, 
+        rc = file->f_op->write(file, (char *)&obddev->u.filter.fo_lastino,
                        sizeof(obddev->u.filter.fo_lastino), &off);
-        if (rc != sizeof(obddev->u.filter.fo_lastino) ) { 
+        if (rc != sizeof(obddev->u.filter.fo_lastino) ) {
                 CERROR("OBD filter: error writing lastino\n");
         }
 
-        rc = filp_close(file, NULL); 
-        if (rc) { 
+        rc = filp_close(file, NULL);
+        if (rc) {
                 CERROR("OBD filter: cannot close status file\n");
         }
  out:
@@ -159,7 +149,7 @@ static void filter_post(struct obd_device *obddev)
 }
 
 
-static __u64 filter_next_id(struct obd_device *obddev) 
+static __u64 filter_next_id(struct obd_device *obddev)
 {
         __u64 id;
         spin_lock(&obddev->u.filter.fo_lock);
@@ -170,7 +160,7 @@ static __u64 filter_next_id(struct obd_device *obddev)
 }
 
 /* how to get files, dentries, inodes from object id's */
-static struct file *filter_obj_open(struct obd_device *obddev, 
+static struct file *filter_obj_open(struct obd_device *obddev,
                                    __u64 id, __u32 type)
 {
         struct obd_run_ctxt saved;
@@ -191,12 +181,12 @@ static struct file *filter_obj_open(struct obd_device *obddev,
                 RETURN(NULL);
         }
 
-        if ( ! (type & S_IFMT) ) { 
-                CERROR("OBD filter_obj_open, no type (%Ld), mode %o!\n", 
+        if ( ! (type & S_IFMT) ) {
+                CERROR("OBD filter_obj_open, no type (%Ld), mode %o!\n",
                        (unsigned long long)id, type);
         }
 
-        filter_id(name, id, type); 
+        filter_id(name, id, type);
         push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
         file = filp_open(name, O_RDONLY | O_LARGEFILE, 0);
         pop_ctxt(&saved);
@@ -213,25 +203,25 @@ static struct file *filter_parent(obd_id id, obd_mode mode)
 
         sprintf(path, "O/%s", obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT]);
 
-        file = filp_open(path, O_RDONLY, 0); 
+        file = filp_open(path, O_RDONLY, 0);
         return file;
 }
 
 
-static struct inode *filter_inode_from_obj(struct obd_device *obddev, 
+static struct inode *filter_inode_from_obj(struct obd_device *obddev,
                                      __u64 id, __u32 type)
 {
         struct file *file;
-        struct inode *inode; 
+        struct inode *inode;
 
         file = filter_obj_open(obddev, id, type);
-        if ( !file ) { 
-                CERROR("filter_inode_from_obdo failed\n"); 
+        if ( !file ) {
+                CERROR("filter_inode_from_obdo failed\n");
                 return NULL;
         }
 
-        inode = iget(file->f_dentry->d_inode->i_sb, 
-                     file->f_dentry->d_inode->i_ino); 
+        inode = iget(file->f_dentry->d_inode->i_sb,
+                     file->f_dentry->d_inode->i_ino);
         filp_close(file, 0);
         return inode;
 }
@@ -318,9 +308,6 @@ static int filter_cleanup(struct obd_device * obddev)
 
         ENTRY;
 
-        if ( !(obddev->obd_flags & OBD_SET_UP) )
-                RETURN(0);
-
         if ( !list_empty(&obddev->obd_gen_clients) ) {
                 CERROR("still has clients!\n");
                 RETURN(-EBUSY);
@@ -333,7 +320,7 @@ static int filter_cleanup(struct obd_device * obddev)
         filter_post(obddev);
 
         unlock_kernel();
-        mntput(obddev->u.filter.fo_vfsmnt); 
+        mntput(obddev->u.filter.fo_vfsmnt);
         obddev->u.filter.fo_sb = 0;
         kfree(obddev->u.filter.fo_fstype);
 
@@ -388,27 +375,23 @@ static inline void filter_from_inode(struct obdo *oa, struct inode *inode)
 static int filter_getattr(struct obd_conn *conn, struct obdo *oa)
 {
         struct inode *inode;
-
         ENTRY;
+
         if ( !gen_client(conn) ) {
                 CDEBUG(D_IOCTL, "fatal: invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
-        if ( !(inode = filter_inode_from_obj(conn->oc_dev, 
-                                              oa->o_id, oa->o_mode)) ) { 
-                EXIT;
-                return -ENOENT;
-        }
+        inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode);
+        if (inode == NULL)
+                RETURN(-ENOENT);
 
         oa->o_valid &= ~OBD_MD_FLID;
         filter_from_inode(oa, inode);
-        
+
         iput(inode);
-        EXIT;
-        return 0;
-} 
+        RETURN(0);
+}
 
 static int filter_setattr(struct obd_conn *conn, struct obdo *oa)
 {
@@ -416,31 +399,28 @@ static int filter_setattr(struct obd_conn *conn, struct obdo *oa)
         struct iattr iattr;
         int rc;
         struct dentry de;
+        ENTRY;
 
         if (!gen_client(conn)) {
                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
-        inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode); 
-        if ( !inode ) { 
-                EXIT;
-                return -ENOENT;
-        }
+        inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode);
+        if ( !inode )
+                RETURN(-ENOENT);
 
         iattr_from_obdo(&iattr, oa);
         iattr.ia_mode &= ~S_IFMT;
         iattr.ia_mode |= S_IFREG;
         de.d_inode = inode;
-        if ( inode->i_op->setattr ) {
+        if ( inode->i_op->setattr )
                 rc = inode->i_op->setattr(&de, &iattr);
-        } else { 
+        else
                 rc = inode_setattr(inode, &iattr);
-        }
 
         iput(inode);
-        EXIT;
-        return rc;
+        RETURN(rc);
 }
 
 static int filter_open(struct obd_conn *conn, struct obdo *oa)
@@ -491,7 +471,7 @@ static int filter_create (struct obd_conn* conn, struct obdo *oa)
         }
 
         oa->o_id = filter_next_id(conn->oc_dev);
-        if ( !(oa->o_mode && S_IFMT) ) { 
+        if ( !(oa->o_mode && S_IFMT) ) {
                 CERROR("filter obd: no type!\n");
                 return -ENOENT;
         }
@@ -500,15 +480,15 @@ static int filter_create (struct obd_conn* conn, struct obdo *oa)
         push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
         mode = oa->o_mode;
         mode &= ~S_IFMT;
-        mode |= S_IFREG; 
+        mode |= S_IFREG;
         file = filp_open(name, O_RDONLY | O_CREAT, mode);
         pop_ctxt(&saved);
-        if (IS_ERR(file)) { 
+        if (IS_ERR(file)) {
                 CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(file));
                 return -ENOENT;
         }
         filp_close(file, 0);
-        
+
         /* Set flags for fields we have set in ext2_new_inode */
         oa->o_valid |= OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
                  OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME |
@@ -525,53 +505,48 @@ static int filter_destroy(struct obd_conn *conn, struct obdo *oa)
         struct file *object;
         int rc;
         struct obd_run_ctxt saved;
+        ENTRY;
 
         if (!(cli = gen_client(conn))) {
                 CERROR("invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
         obddev = conn->oc_dev;
         object = filter_obj_open(obddev, oa->o_id, oa->o_mode);
-        if (!object || IS_ERR(object)) { 
-                EXIT;
-                return -ENOENT;
-        }
-        
+        if (!object || IS_ERR(object))
+                RETURN(-ENOENT);
+
         inode = object->f_dentry->d_inode;
         inode->i_nlink = 1;
         inode->i_mode = 010000;
 
         push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
         dir = filter_parent(oa->o_id, oa->o_mode);
-        if (IS_ERR(dir)) {
-                rc = PTR_ERR(dir);
-                EXIT;
-                goto out;
-        }
-        dget(dir->f_dentry); 
+        if (IS_ERR(dir))
+                GOTO(out, rc = PTR_ERR(dir));
+        dget(dir->f_dentry);
         dget(object->f_dentry);
         rc = vfs_unlink(dir->f_dentry->d_inode, object->f_dentry);
 
         filp_close(dir, 0);
         filp_close(object, 0);
+        EXIT;
 out:
         pop_ctxt(&saved);
-        EXIT;
         return rc;
 }
 
-static int filter_truncate(struct obd_conn *conn, struct obdo *oa, obd_size count,
-                         obd_off offset)
+static int filter_truncate(struct obd_conn *conn, struct obdo *oa,
+                           obd_size count, obd_off offset)
 {
         int error;
+        ENTRY;
 
         error = filter_setattr(conn, oa);
         oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
 
-        EXIT;
-        return error;
+        RETURN(error);
 }
 
 /* buffer must lie in user memory here */
@@ -581,18 +556,16 @@ static int filter_read(struct obd_conn *conn, struct obdo *oa, char *buf,
         struct file * file;
         unsigned long retval;
         int err;
+        ENTRY;
 
         if (!gen_client(conn)) {
                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
-        file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode); 
-        if (!file || IS_ERR(file)) { 
-                EXIT;
-                return -PTR_ERR(file);
-        }
+        file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
+        if (!file || IS_ERR(file))
+                RETURN(-PTR_ERR(file));
 
         /* count doubles as retval */
         retval = file->f_op->read(file, buf, *count, (loff_t *)&offset);
@@ -611,25 +584,22 @@ static int filter_read(struct obd_conn *conn, struct obdo *oa, char *buf,
 
 
 /* buffer must lie in user memory here */
-static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf, 
+static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf,
                          obd_size *count, obd_off offset)
 {
         int err;
         struct file * file;
         unsigned long retval;
-
         ENTRY;
+
         if (!gen_client(conn)) {
                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
-        file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode); 
-        if (!file || IS_ERR(file)) { 
-                EXIT;
-                return -PTR_ERR(file);
-        }
+        file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
+        if (!file || IS_ERR(file))
+                RETURN(-PTR_ERR(file));
 
         /* count doubles as retval */
         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
@@ -648,13 +618,13 @@ static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf,
         return err;
 } /* filter_write */
 
-static int filter_pgcache_brw(int rw, struct obd_conn *conn, 
+static int filter_pgcache_brw(int rw, struct obd_conn *conn,
                                obd_count num_oa,
-                               struct obdo **oa, 
-                               obd_count *oa_bufs, 
+                               struct obdo **oa,
+                               obd_count *oa_bufs,
                                struct page **pages,
-                               obd_size *count, 
-                               obd_off *offset, 
+                               obd_size *count,
+                               obd_off *offset,
                                obd_flag *flags)
 {
         struct super_block      *sb;
@@ -664,49 +634,44 @@ static int filter_pgcache_brw(int rw, struct obd_conn *conn,
         unsigned long            retval;
         int                      error;
         struct file *file;
-
         ENTRY;
 
         if (!gen_client(conn)) {
                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
         sb = conn->oc_dev->u.filter.fo_sb;
         oldfs = get_fs();
-        set_fs(KERNEL_DS); 
+        set_fs(KERNEL_DS);
 
         pnum = 0; /* pnum indexes buf 0..num_pages */
         for (onum = 0; onum < num_oa; onum++) {
-                int              pg;
-
-                file = filter_obj_open(conn->oc_dev, oa[onum]->o_id, 
-                                       oa[onum]->o_mode); 
-                if (!file || IS_ERR(file)) { 
-                        EXIT;
-                        error = -ENOENT;
-                        goto ERROR;
-                }
+                int pg;
+
+                file = filter_obj_open(conn->oc_dev, oa[onum]->o_id,
+                                       oa[onum]->o_mode);
+                if (!file || IS_ERR(file))
+                        GOTO(ERROR, error = -ENOENT);
 
                 /* count doubles as retval */
                 for (pg = 0; pg < oa_bufs[onum]; pg++) {
                         CDEBUG(D_INODE, "OP %d obdo no/pno: (%d,%d) (%ld,%ld) "
-                               "off count (%Ld,%Ld)\n", 
+                               "off count (%Ld,%Ld)\n",
                                rw, onum, pnum, file->f_dentry->d_inode->i_ino,
                                (unsigned long)offset[pnum] >> PAGE_CACHE_SHIFT,
                                (unsigned long long)offset[pnum],
                                (unsigned long long)count[pnum]);
-                        if (rw == WRITE) { 
-                                loff_t off; 
+                        if (rw == WRITE) {
+                                loff_t off;
                                 char *buffer;
-                                off = offset[pnum]; 
-                                buffer = kmap(pages[pnum]); 
+                                off = offset[pnum];
+                                buffer = kmap(pages[pnum]);
                                 retval = file->f_op->write(file, buffer, count[pnum], &off);
                                 kunmap(pages[pnum]);
-                                CDEBUG(D_INODE, "retval %ld\n", retval); 
-                        } else { 
-                                loff_t off = offset[pnum]; 
+                                CDEBUG(D_INODE, "retval %ld\n", retval);
+                        } else {
+                                loff_t off = offset[pnum];
                                 char *buffer = kmap(pages[pnum]);
 
                                 if (off >= file->f_dentry->d_inode->i_size) {
@@ -714,24 +679,22 @@ static int filter_pgcache_brw(int rw, struct obd_conn *conn,
                                         retval = count[pnum];
                                 } else {
                                         retval = file->f_op->read(file, buffer, count[pnum], &off);
-                                } 
+                                }
                                 kunmap(pages[pnum]);
 
                                 if ( retval != count[pnum] ) {
                                         filp_close(file, 0);
-                                        retval = -EIO;
-                                        EXIT;
-                                        goto ERROR;
+                                        GOTO(ERROR, retval = -EIO);
                                 }
-                                CDEBUG(D_INODE, "retval %ld\n", retval); 
+                                CDEBUG(D_INODE, "retval %ld\n", retval);
                         }
                         pnum++;
                 }
                 /* sizes and blocks are set by generic_file_write */
-                /* ctimes/mtimes will follow with a setattr call */ 
+                /* ctimes/mtimes will follow with a setattr call */
                 filp_close(file, 0);
         }
-        
+
         EXIT;
  ERROR:
         set_fs(oldfs);
@@ -744,17 +707,16 @@ struct inode *ioobj_to_inode(struct obd_conn *conn, struct obd_ioobj *o)
 {
         struct inode *inode = NULL;
         struct super_block *sb = conn->oc_dev->u.ext2.e2_sb;
+        ENTRY;
 
         if (!sb || !sb->s_dev) {
                 CDEBUG(D_SUPER, "fatal: device not initialized.\n");
-                EXIT;
-                return NULL;
+                RETURN(NULL);
         }
 
         if ( !o->ioo_id ) {
                 CDEBUG(D_INODE, "fatal: invalid obdo %lu\n", (long)o->ioo_id);
-                EXIT;
-                return NULL;
+                RETURN(NULL);
         }
 
         inode = filter_inode_from_obj(conn->oc_dev, o->ioo_id, S_IFREG);
@@ -764,21 +726,20 @@ struct inode *ioobj_to_inode(struct obd_conn *conn, struct obd_ioobj *o)
                        "no links" : "NULL");
                 if (inode)
                         iput(inode);
-                EXIT;
-                return NULL;
+                RETURN(NULL);
         }
 
-        return inode;
+        RETURN(inode);
 }
 
 static int filter_preprw(int cmd, struct obd_conn *conn,
                          int objcount, struct obd_ioobj *obj,
-                         int niocount, struct niobuf *nb,
-                         struct niobuf *res)
+                         int niocount, struct niobuf_remote *nb,
+                         struct niobuf_local *res)
 {
         struct obd_ioobj *o = obj;
-        struct niobuf *b = nb;
-        struct niobuf *r = res;
+        struct niobuf_remote *b = nb;
+        struct niobuf_local *r = res;
         int i;
         ENTRY;
 
@@ -813,10 +774,10 @@ static int filter_preprw(int cmd, struct obd_conn *conn,
 
 static int filter_commitrw(int cmd, struct obd_conn *conn,
                            int objcount, struct obd_ioobj *obj,
-                           int niocount, struct niobuf *res)
+                           int niocount, struct niobuf_local *res)
 {
         struct obd_ioobj *o = obj;
-        struct niobuf *r = res;
+        struct niobuf_local *r = res;
         int i;
         ENTRY;
 
@@ -847,20 +808,17 @@ static int filter_statfs (struct obd_conn *conn, struct statfs * statfs)
 {
         struct super_block *sb;
         int err;
-
         ENTRY;
 
         if (!gen_client(conn)) {
                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
         sb = conn->oc_dev->u.filter.fo_sb;
 
         err = sb->s_op->statfs(sb, statfs);
-        EXIT;
-        return err;
+        RETURN(err);
 } /* filter_statfs */
 
 
@@ -873,37 +831,34 @@ static int  filter_get_info(struct obd_conn *conn, obd_count keylen,
 
         if (!(cli = gen_client(conn))) {
                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
 
         obddev = conn->oc_dev;
-        
+
         if ( keylen == strlen("blocksize") &&
              memcmp(key, "blocksize", keylen) == 0 ) {
                 *vallen = sizeof(int);
                 *val = (void *)obddev->u.filter.fo_sb->s_blocksize;
-                EXIT;
-                return 0;
+                RETURN(0);
         }
 
         if ( keylen == strlen("blocksize_bits") &&
              memcmp(key, "blocksize_bits", keylen) == 0 ){
                 *vallen = sizeof(int);
                 *val = (void *)(int)obddev->u.filter.fo_sb->s_blocksize_bits;
-                EXIT;
-                return 0;
+                RETURN(0);
         }
 
         if ( keylen == strlen("root_ino") &&
              memcmp(key, "root_ino", keylen) == 0 ){
                 *vallen = sizeof(int);
                 *val = (void *)(int) FILTER_ROOTINO;
-                EXIT;
-                return 0;
+                RETURN(0);
         }
-        
+
         CDEBUG(D_IOCTL, "invalid key\n");
-        return -EINVAL;
+        RETURN(-EINVAL);
 }
 
 
@@ -949,7 +904,7 @@ static void __exit obdfilter_exit(void)
 
 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Filtering OBD driver v1.0");
-MODULE_LICENSE("GPL"); 
+MODULE_LICENSE("GPL");
 
 module_init(obdfilter_init);
 module_exit(obdfilter_exit);
index 70a6260..fb0094e 100644 (file)
@@ -348,131 +348,116 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
         return 0;
 }
 
-int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
-                 struct niobuf *dst, struct niobuf *src)
+static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
+                        struct niobuf_remote *dst, struct niobuf_local *src)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct ptlrpc_bulk_desc *bulk;
-        int rc;
+        struct ptlrpc_bulk_page *page;
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection);
-
-        bulk = ptlrpc_prep_bulk(connection);
-        if (bulk == NULL)
+        page = ptlrpc_prep_bulk_page(desc);
+        if (page == NULL)
                 RETURN(-ENOMEM);
 
-        bulk->b_buf = (void *)(unsigned long)src->addr;
-        bulk->b_buflen = src->len;
-        bulk->b_xid = dst->xid;
-        rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
-        if (rc != 0) {
-                CERROR("send_bulk failed: %d\n", rc);
-                ptlrpc_free_bulk(bulk);
-                LBUG();
-                RETURN(rc);
-        }
-        wait_event_interruptible(bulk->b_waitq, ptlrpc_check_bulk_sent(bulk));
-
-        if (bulk->b_flags & PTL_RPC_FL_INTR) {
-                ptlrpc_free_bulk(bulk);
-                RETURN(-EINTR);
-        }
+        page->b_buf = (void *)(unsigned long)src->addr;
+        page->b_buflen = src->len;
+        page->b_xid = dst->xid;
 
-        ptlrpc_free_bulk(bulk);
         RETURN(0);
 }
 
-int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
-                 obd_count *oa_bufs, struct page **buf, obd_size *count,
-                 obd_off *offset, obd_flag *flags)
+static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
+                        struct obdo **oa, obd_count *oa_bufs, struct page **buf,
+                        obd_size *count, obd_off *offset, obd_flag *flags)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
         struct ptlrpc_request *request;
         struct ost_body *body;
+        struct list_head *tmp, *next;
         int pages, rc, i, j, size[3] = {sizeof(*body)};
         void *ptr1, *ptr2;
-        struct ptlrpc_bulk_desc **bulk;
+        struct ptlrpc_bulk_desc *desc;;
         ENTRY;
 
         size[1] = num_oa * sizeof(struct obd_ioobj);
         pages = 0;
         for (i = 0; i < num_oa; i++)
                 pages += oa_bufs[i];
-        size[2] = pages * sizeof(struct niobuf);
-
-        OBD_ALLOC(bulk, pages * sizeof(*bulk));
-        if (bulk == NULL)
-                RETURN(-ENOMEM);
+        size[2] = pages * sizeof(struct niobuf_remote);
 
         osc_con2cl(conn, &cl, &connection);
         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
         if (!request)
-                GOTO(out, rc = -ENOMEM);
+                GOTO(out3, rc = -ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         body->data = OBD_BRW_READ;
 
+        desc = ptlrpc_prep_bulk(connection);
+        if (desc == NULL)
+                GOTO(out2, rc = -ENOMEM);
+        desc->b_portal = OST_BULK_PORTAL;
+
         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
         for (pages = 0, i = 0; i < num_oa; i++) {
                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        bulk[pages] = ptlrpc_prep_bulk(connection);
-                        if (bulk[pages] == NULL)
+                        struct ptlrpc_bulk_page *page;
+                        page = ptlrpc_prep_bulk_page(desc);
+                        if (page == NULL)
                                 GOTO(out, rc = -ENOMEM);
 
                         spin_lock(&connection->c_lock);
-                        bulk[pages]->b_xid = ++connection->c_xid_out;
+                        page->b_xid = ++connection->c_xid_out;
                         spin_unlock(&connection->c_lock);
 
-                        bulk[pages]->b_buf = kmap(buf[pages]);
-                        bulk[pages]->b_buflen = PAGE_SIZE;
-                        bulk[pages]->b_portal = OST_BULK_PORTAL;
-                        ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
-                                        offset[pages], count[pages],
-                                        flags[pages], bulk[pages]->b_xid);
-
-                        rc = ptlrpc_register_bulk(bulk[pages]);
-                        if (rc)
-                                GOTO(out, rc);
+                        page->b_buf = kmap(buf[pages]);
+                        page->b_buflen = PAGE_SIZE;
+                        ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+                                        flags[pages], page->b_xid);
                 }
         }
 
+        rc = ptlrpc_register_bulk(desc);
+        if (rc)
+                GOTO(out, rc);
+
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
+        if (rc)
+                ptlrpc_abort_bulk(desc);
         GOTO(out, rc);
 
  out:
-        /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
-         * abort those bulk listeners. */
+        list_for_each_safe(tmp, next, &desc->b_page_list) {
+                struct ptlrpc_bulk_page *page;
+                page = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
 
-        for (pages = 0, i = 0; i < num_oa; i++) {
-                for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        if (bulk[pages] == NULL)
-                                continue;
-                        kunmap(buf[pages]);
-                        ptlrpc_free_bulk(bulk[pages]);
-                }
+                if (page->b_buf != NULL)
+                        kunmap(page->b_buf);
         }
 
-        OBD_FREE(bulk, pages * sizeof(*bulk));
+        ptlrpc_free_bulk(desc);
+ out2:
         ptlrpc_free_req(request);
+ out3:
         return rc;
 }
 
-int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
-                  obd_count *oa_bufs, struct page **buf, obd_size *count,
-                  obd_off *offset, obd_flag *flags)
+static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
+                         struct obdo **oa, obd_count *oa_bufs,
+                         struct page **buf, obd_size *count, obd_off *offset,
+                         obd_flag *flags)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
         struct ptlrpc_request *request;
+        struct ptlrpc_bulk_desc *desc;
         struct obd_ioobj ioo;
         struct ost_body *body;
-        struct niobuf *src;
+        struct niobuf_local *local;
+        struct niobuf_remote *remote;
         long pages;
         int rc, i, j, size[3] = {sizeof(*body)};
         void *ptr1, *ptr2;
@@ -482,16 +467,16 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         pages = 0;
         for (i = 0; i < num_oa; i++)
                 pages += oa_bufs[i];
-        size[2] = pages * sizeof(*src);
+        size[2] = pages * sizeof(*remote);
 
-        OBD_ALLOC(src, size[2]);
-        if (!src)
+        OBD_ALLOC(local, pages * sizeof(*local));
+        if (local == NULL)
                 RETURN(-ENOMEM);
 
         osc_con2cl(conn, &cl, &connection);
         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
         if (!request)
-                RETURN(-ENOMEM);
+                GOTO(out3, rc = -ENOMEM);
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         body->data = OBD_BRW_WRITE;
 
@@ -500,50 +485,64 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         for (pages = 0, i = 0; i < num_oa; i++) {
                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
-                                        count[pages], flags[pages], 0);
+                        local[pages].addr = (__u64)(long)kmap(buf[pages]);
+                        local[pages].offset = offset[pages];
+                        local[pages].len = count[pages];
+                        ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+                                        flags[pages], 0);
                 }
         }
-        memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
 
-        size[1] = pages * sizeof(struct niobuf);
+        size[1] = pages * sizeof(struct niobuf_remote);
         request->rq_replen = lustre_msg_size(2, size);
 
         rc = ptlrpc_queue_wait(request);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out2, rc);
 
         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
         if (ptr2 == NULL)
-                GOTO(out, rc = -EINVAL);
+                GOTO(out2, rc = -EINVAL);
 
-        if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
+        if (request->rq_repmsg->buflens[1] !=
+            pages * sizeof(struct niobuf_remote)) {
                 CERROR("buffer length wrong (%d vs. %ld)\n",
                        request->rq_repmsg->buflens[1],
-                       pages * sizeof(struct niobuf));
-                GOTO(out, rc = -EINVAL);
+                       pages * sizeof(struct niobuf_remote));
+                GOTO(out2, rc = -EINVAL);
         }
 
+        desc = ptlrpc_prep_bulk(connection);
+        desc->b_portal = OSC_BULK_PORTAL;
+
         for (pages = 0, i = 0; i < num_oa; i++) {
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        struct niobuf *dst;
-                        ost_unpack_niobuf(&ptr2, &dst);
-                        osc_sendpage(conn, request, dst, &src[pages]);
+                        ost_unpack_niobuf(&ptr2, &remote);
+                        rc = osc_sendpage(desc, remote, &local[pages]);
+                        if (rc)
+                                GOTO(out, rc);
                 }
         }
-        OBD_FREE(src, size[2]);
+
+        rc = ptlrpc_send_bulk(desc);
+        GOTO(out, rc);
+
  out:
+        ptlrpc_free_bulk(desc);
+ out2:
+        ptlrpc_free_req(request);
         for (pages = 0, i = 0; i < num_oa; i++)
                 for (j = 0; j < oa_bufs[i]; j++, pages++)
                         kunmap(buf[pages]);
+ out3:
+        OBD_FREE(local, pages * sizeof(*local));
 
-        ptlrpc_free_req(request);
-        return 0;
+        return rc;
 }
 
-int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
-              struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-              obd_size *count, obd_off *offset, obd_flag *flags)
+static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
+                   struct obdo **oa, obd_count *oa_bufs, struct page **buf,
+                   obd_size *count, obd_off *offset, obd_flag *flags)
 {
         if (rw == OBD_BRW_READ)
                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
@@ -553,10 +552,11 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                                      offset, flags);
 }
 
-int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
-                struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type,
-                struct ldlm_extent *extent, __u32 mode, int *flags, void *data,
-                int datalen, struct ldlm_handle *lockh)
+static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
+                       struct ldlm_handle *parent_lock, __u64 *res_id,
+                       __u32 type, struct ldlm_extent *extent, __u32 mode,
+                       int *flags, void *data, int datalen,
+                       struct ldlm_handle *lockh)
 {
         struct ptlrpc_connection *conn;
         struct ptlrpc_client *cl;
@@ -610,7 +610,8 @@ int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
         return rc;
 }
 
-int osc_cancel(struct obd_conn *oconn, __u32 mode, struct ldlm_handle *lockh)
+static int osc_cancel(struct obd_conn *oconn, __u32 mode,
+                      struct ldlm_handle *lockh)
 {
         struct ldlm_lock *lock;
         ENTRY;
index 8d56058..6c9e27d 100644 (file)
@@ -252,10 +252,11 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
 
 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
-        struct ptlrpc_bulk_desc *bulk = NULL;
+        struct ptlrpc_bulk_desc *desc;
         struct obd_conn conn;
         void *tmp1, *tmp2, *end2;
-        struct niobuf *nb, *dst, *res = NULL;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb = NULL;
         struct obd_ioobj *ioo;
         struct ost_body *body;
         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
@@ -266,7 +267,7 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
-        niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
         cmd = body->data;
 
         conn.oc_id = body->connid;
@@ -276,75 +277,71 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
                 ost_unpack_ioo(&tmp1, &ioo);
                 if (tmp2 + ioo->ioo_bufcnt > end2) {
                         LBUG();
-                        rc = -EFAULT;
-                        break;
-                }
-                for (j = 0; j < ioo->ioo_bufcnt; j++) {
-                        ost_unpack_niobuf(&tmp2, &nb);
+                        GOTO(out, rc = -EFAULT);
                 }
+                for (j = 0; j < ioo->ioo_bufcnt; j++)
+                        ost_unpack_niobuf(&tmp2, &remote_nb);
         }
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
-        OBD_ALLOC(res, sizeof(*res) * niocount);
-        if (res == NULL)
+        OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
+        if (local_nb == NULL)
                 RETURN(-ENOMEM);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         req->rq_status = obd_preprw(cmd, &conn, objcount,
-                                    tmp1, niocount, tmp2, res);
+                                    tmp1, niocount, tmp2, local_nb);
 
         if (req->rq_status)
-                GOTO(out_res, 0);
+                GOTO(out_local, 0);
 
-        for (i = 0; i < niocount; i++) {
-                bulk = ptlrpc_prep_bulk(req->rq_connection);
-                if (bulk == NULL) {
-                        CERROR("cannot alloc bulk desc\n");
-                        GOTO(out_res, rc = -ENOMEM);
-                }
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(out_local, rc = -ENOMEM);
+        desc->b_portal = OST_BULK_PORTAL;
 
-                dst = &(((struct niobuf *)tmp2)[i]);
-                bulk->b_xid = dst->xid;
-                bulk->b_buf = (void *)(unsigned long)res[i].addr;
+        for (i = 0; i < niocount; i++) {
+                struct ptlrpc_bulk_page *bulk;
+                bulk = ptlrpc_prep_bulk_page(desc);
+                if (bulk == NULL)
+                        GOTO(out_bulk, rc = -ENOMEM);
+                remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
+                bulk->b_xid = remote_nb->xid;
+                bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
                 bulk->b_buflen = PAGE_SIZE;
-                rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
-                if (rc)
-                        GOTO(out_bulk, rc);
-                wait_event_interruptible(bulk->b_waitq,
-                                         ptlrpc_check_bulk_sent(bulk));
+        }
 
-                if (bulk->b_flags & PTL_RPC_FL_INTR)
-                        GOTO(out_bulk, 0);
+        rc = ptlrpc_send_bulk(desc);
+        if (rc)
+                GOTO(out_bulk, rc);
 
-                ptlrpc_free_bulk(bulk);
-        }
-        bulk = NULL;
+        ptlrpc_free_bulk(desc);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         req->rq_status = obd_commitrw(cmd, &conn, objcount,
-                                      tmp1, niocount, res);
+                                      tmp1, niocount, local_nb);
 
-        EXIT;
-out_bulk:
-        if (bulk != NULL)
-                ptlrpc_free_bulk(bulk);
-out_res:
-        if (res != NULL)
-                OBD_FREE(res, sizeof(*res) * niocount);
+        RETURN(rc);
 
+ out_bulk:
+        ptlrpc_free_bulk(desc);
+ out_local:
+        if (local_nb != NULL)
+                OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+ out:
         return 0;
 }
 
 static int ost_commit_page(struct obd_conn *conn, struct page *page)
 {
         struct obd_ioobj obj;
-        struct niobuf buf;
+        struct niobuf_local buf;
         int rc;
         ENTRY;
 
@@ -358,23 +355,31 @@ static int ost_commit_page(struct obd_conn *conn, struct page *page)
         RETURN(rc);
 }
 
-static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data)
+static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
 {
         int rc;
-
         ENTRY;
 
-        rc = ost_commit_page(&bulk->b_conn, bulk->b_page);
+        rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page);
         if (rc)
                 CERROR("ost_commit_page failed: %d\n", rc);
 
         RETURN(rc);
 }
 
+static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
+{
+        ptlrpc_free_bulk(desc);
+
+        return 0;
+}
+
 static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
+        struct ptlrpc_bulk_desc *desc;
         struct obd_conn conn;
-        struct niobuf *nb, *res;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb, *lnb;
         struct obd_ioobj *ioo;
         struct ost_body *body;
         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
@@ -386,7 +391,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
-        niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
         cmd = body->data;
 
         conn.oc_id = body->connid;
@@ -398,56 +403,73 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
                         rc = -EFAULT;
                         break;
                 }
-                for (j = 0; j < ioo->ioo_bufcnt; j++) {
-                        ost_unpack_niobuf((void *)&tmp2, &nb);
-                }
+                for (j = 0; j < ioo->ioo_bufcnt; j++)
+                        ost_unpack_niobuf((void *)&tmp2, &remote_nb);
         }
 
-        size[1] = niocount * sizeof(*nb);
+        size[1] = niocount * sizeof(*remote_nb);
         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
-                RETURN(rc);
+                GOTO(fail, rc);
+        remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
 
-        res = lustre_msg_buf(req->rq_repmsg, 1);
+        OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+        if (local_nb == NULL)
+                GOTO(fail, rc = -ENOMEM);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         req->rq_status = obd_preprw(cmd, &conn, objcount,
-                                    tmp1, niocount, tmp2, res);
-
+                                    tmp1, niocount, tmp2, local_nb);
         if (req->rq_status)
-                GOTO(out, 0);
+                GOTO(success, 0);
 
-        for (i = 0; i < niocount; i++, res++) {
-                struct ptlrpc_bulk_desc *bulk;
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(fail_preprw, rc = -ENOMEM);
+        desc->b_cb = ost_brw_write_finished_cb;
+        desc->b_portal = OSC_BULK_PORTAL;
+        memcpy(&(desc->b_conn), &conn, sizeof(conn));
+
+        for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+                struct ptlrpc_bulk_page *bulk;
 
-                bulk = ptlrpc_prep_bulk(req->rq_connection);
+                bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
-                        GOTO(out, rc = -ENOMEM);
+                        GOTO(fail_bulk, rc = -ENOMEM);
 
                 spin_lock(&srv->srv_lock);
                 bulk->b_xid = srv->srv_xid++;
                 spin_unlock(&srv->srv_lock);
 
-                res->xid = HTON__u32(bulk->b_xid);
-
-                bulk->b_buf = (void *)(unsigned long)res->addr;
-                bulk->b_cb = ost_brw_write_cb;
-                bulk->b_page = res->page;
-                memcpy(&(bulk->b_conn), &conn, sizeof(conn));
+                bulk->b_buf = (void *)(unsigned long)lnb->addr;
+                bulk->b_page = lnb->page;
                 bulk->b_buflen = PAGE_SIZE;
-                bulk->b_portal = OSC_BULK_PORTAL;
-                rc = ptlrpc_register_bulk(bulk);
-                if (rc)
-                        GOTO(out, rc);
+                bulk->b_cb = ost_brw_write_cb;
+
+                /* this advances remote_nb */
+                ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+                                bulk->b_xid);
         }
 
+        rc = ptlrpc_register_bulk(desc);
+        if (rc)
+                GOTO(fail_bulk, rc);
+
         EXIT;
out:
-        /* FIXME: should we return 'rc' here? */
success:
+        OBD_FREE(local_nb, niocount * sizeof(*local_nb));
         return 0;
+
+ fail_bulk:
+        ptlrpc_free_bulk(desc);
+ fail_preprw:
+        OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+        /* FIXME: how do we undo the preprw? */
+ fail:
+        return rc;
 }
 
 static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
@@ -595,11 +617,9 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err)
                 GOTO(error_disc, err = -EINVAL);
-#if 1
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err)
                 GOTO(error_disc, err = -EINVAL);
-#endif
 
         RETURN(0);
 
index 167e67d..288d43f 100644 (file)
@@ -93,25 +93,62 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
         if (bulk != NULL) {
                 bulk->b_connection = ptlrpc_connection_addref(conn);
                 init_waitqueue_head(&bulk->b_waitq);
+                INIT_LIST_HEAD(&bulk->b_page_list);
         }
 
         return bulk;
 }
 
+struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
+{
+        struct ptlrpc_bulk_page *page;
+
+        OBD_ALLOC(page, sizeof(*page));
+        if (page != NULL) {
+                page->b_desc = desc;
+                ptl_set_inv_handle(&page->b_md_h);
+                ptl_set_inv_handle(&page->b_me_h);
+                list_add(&page->b_link, &desc->b_page_list);
+                desc->b_page_count++;
+        }
+        return page;
+}
+
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk)
 {
+        struct list_head *tmp, *next;
         ENTRY;
         if (bulk == NULL) {
                 EXIT;
                 return;
         }
 
+        list_for_each_safe(tmp, next, &bulk->b_page_list) {
+                struct ptlrpc_bulk_page *page;
+                page = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+                ptlrpc_free_bulk_page(page);
+        }
+
         ptlrpc_put_connection(bulk->b_connection);
 
         OBD_FREE(bulk, sizeof(*bulk));
         EXIT;
 }
 
+void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page)
+{
+        ENTRY;
+        if (page == NULL) {
+                EXIT;
+                return;
+        }
+
+        list_del(&page->b_link);
+        page->b_desc->b_page_count--;
+        OBD_FREE(page, sizeof(*page));
+        EXIT;
+}
+
 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
                                        struct ptlrpc_connection *conn,
                                        int opcode, int count, int *lengths,
index 5c0930d..a016f35 100644 (file)
@@ -133,15 +133,16 @@ int request_in_callback(ptl_event_t *ev, void *data)
 
 static int bulk_source_callback(ptl_event_t *ev, void *data)
 {
-        struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_desc *desc = bulk->b_desc;
         ENTRY;
 
         if (ev->type == PTL_EVENT_SENT) {
                 CDEBUG(D_NET, "got SENT event\n");
         } else if (ev->type == PTL_EVENT_ACK) {
                 CDEBUG(D_NET, "got ACK event\n");
-                bulk->b_flags |= PTL_BULK_FL_SENT;
-                wake_up_interruptible(&bulk->b_waitq);
+                desc->b_flags |= PTL_BULK_FL_SENT;
+                wake_up_interruptible(&desc->b_waitq);
         } else {
                 CERROR("Unexpected event type!\n");
                 LBUG();
@@ -152,25 +153,27 @@ static int bulk_source_callback(ptl_event_t *ev, void *data)
 
 static int bulk_sink_callback(ptl_event_t *ev, void *data)
 {
-        struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_desc *desc = bulk->b_desc;
         ENTRY;
 
         if (ev->type == PTL_EVENT_PUT) {
                 if (bulk->b_buf != ev->mem_desc.start + ev->offset)
                         CERROR("bulkbuf != mem_desc -- why?\n");
-                bulk->b_flags |= PTL_BULK_FL_RCVD;
+                desc->b_finished_count++;
+                if (desc->b_finished_count == desc->b_page_count) {
+                        desc->b_flags |= PTL_BULK_FL_RCVD;
+                        wake_up_interruptible(&desc->b_waitq);
+                        if (desc->b_cb != NULL)
+                                desc->b_cb(desc);
+                }
                 if (bulk->b_cb != NULL)
-                        bulk->b_cb(bulk, data);
-                wake_up_interruptible(&bulk->b_waitq);
+                        bulk->b_cb(bulk);
         } else {
                 CERROR("Unexpected event type!\n");
                 LBUG();
         }
 
-        /* FIXME: This should happen unconditionally */
-        if (bulk->b_cb != NULL)
-                ptlrpc_free_bulk(bulk);
-
         RETURN(1);
 }
 
index 755032d..5fc28f9 100644 (file)
  */
 
 #define EXPORT_SYMTAB
-
 #define DEBUG_SUBSYSTEM S_RPC
 
 #include <linux/lustre_net.h>
 
-extern ptl_handle_eq_t request_out_eq, 
-        reply_in_eq, 
-        reply_out_eq,
-        bulk_source_eq, 
-        bulk_sink_eq;
+extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
+        bulk_source_eq, bulk_sink_eq;
 static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY};
 
-
-int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+static int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
 {
         ENTRY;
 
@@ -108,92 +103,121 @@ static int ptl_send_buf(struct ptlrpc_request *request,
         return rc;
 }
 
-int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
 {
         int rc;
+        struct list_head *tmp, *next;
         ptl_process_id_t remote_id;
+        ENTRY;
 
-        bulk->b_md.start = bulk->b_buf;
-        bulk->b_md.length = bulk->b_buflen;
-        bulk->b_md.eventq = bulk_source_eq;
-        bulk->b_md.threshold = 2; /* SENT and ACK events */
-        bulk->b_md.options = PTL_MD_OP_PUT;
-        bulk->b_md.user_ptr = bulk;
-
-        rc = PtlMDBind(bulk->b_connection->c_peer.peer_ni, bulk->b_md,
-                       &bulk->b_md_h);
-        if (rc != 0) {
-                CERROR("PtlMDBind failed: %d\n", rc);
-                LBUG();
-                return rc;
+        list_for_each_safe(tmp, next, &desc->b_page_list) {
+                /* only request an ACK for the last page */
+                int ack = (next == &desc->b_page_list);
+                struct ptlrpc_bulk_page *bulk;
+                bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+                bulk->b_md.start = bulk->b_buf;
+                bulk->b_md.length = bulk->b_buflen;
+                bulk->b_md.eventq = bulk_source_eq;
+                bulk->b_md.threshold = 1 + ack; /* SENT and (if last) ACK */
+                bulk->b_md.options = PTL_MD_OP_PUT;
+                bulk->b_md.user_ptr = bulk;
+
+                rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md,
+                               &bulk->b_md_h);
+                if (rc != 0) {
+                        CERROR("PtlMDBind failed: %d\n", rc);
+                        LBUG();
+                        RETURN(rc);
+                }
+
+                remote_id.nid = desc->b_connection->c_peer.peer_nid;
+                remote_id.pid = 0;
+
+                CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
+                       bulk->b_md.length, desc->b_portal, bulk->b_xid);
+
+                rc = PtlPut(bulk->b_md_h, (ack ? PTL_ACK_REQ : PTL_NOACK_REQ),
+                            remote_id, desc->b_portal, 0, bulk->b_xid, 0, 0);
+                if (rc != PTL_OK) {
+                        CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+                               desc->b_portal, bulk->b_xid, rc);
+                        PtlMDUnlink(bulk->b_md_h);
+                        LBUG();
+                        RETURN(rc);
+                }
         }
 
-        remote_id.nid = bulk->b_connection->c_peer.peer_nid;
-        remote_id.pid = 0;
+        wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
 
-        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
-               bulk->b_md.length, portal, bulk->b_xid);
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                RETURN(-EINTR);
 
-        rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id, portal, 0,
-                    bulk->b_xid, 0, 0);
-        if (rc != PTL_OK) {
-                CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
-                       portal, bulk->b_xid, rc);
-                PtlMDUnlink(bulk->b_md_h);
-                LBUG();
-        }
-
-        return rc;
+        RETURN(0);
 }
 
-int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc)
 {
+        struct list_head *tmp, *next;
         int rc;
         ENTRY;
 
-        rc = PtlMEAttach(bulk->b_connection->c_peer.peer_ni, bulk->b_portal,
-                         local_id, bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
-                         &bulk->b_me_h);
-        if (rc != PTL_OK) {
-                CERROR("PtlMEAttach failed: %d\n", rc);
-                LBUG();
-                GOTO(cleanup, rc);
-        }
-
-        bulk->b_md.start = bulk->b_buf;
-        bulk->b_md.length = bulk->b_buflen;
-        bulk->b_md.threshold = 1;
-        bulk->b_md.options = PTL_MD_OP_PUT;
-        bulk->b_md.user_ptr = bulk;
-        bulk->b_md.eventq = bulk_sink_eq;
-
-        rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
-        //CERROR("MDAttach (bulk sink): %Lu\n", (__u64)bulk->b_md_h);
-        if (rc != PTL_OK) {
-                CERROR("PtlMDAttach failed: %d\n", rc);
-                LBUG();
-                GOTO(cleanup, rc);
+        list_for_each_safe(tmp, next, &desc->b_page_list) {
+                struct ptlrpc_bulk_page *bulk;
+                bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+                rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni,
+                                 desc->b_portal, local_id, bulk->b_xid, 0,
+                                 PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h);
+                if (rc != PTL_OK) {
+                        CERROR("PtlMEAttach failed: %d\n", rc);
+                        LBUG();
+                        GOTO(cleanup, rc);
+                }
+
+                bulk->b_md.start = bulk->b_buf;
+                bulk->b_md.length = bulk->b_buflen;
+                bulk->b_md.threshold = 1;
+                bulk->b_md.options = PTL_MD_OP_PUT;
+                bulk->b_md.user_ptr = bulk;
+                bulk->b_md.eventq = bulk_sink_eq;
+
+                rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK,
+                                 &bulk->b_md_h);
+                if (rc != PTL_OK) {
+                        CERROR("PtlMDAttach failed: %d\n", rc);
+                        LBUG();
+                        GOTO(cleanup, rc);
+                }
+
+                CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, "
+                       "portal %u\n", bulk->b_buflen, bulk->b_xid,
+                       desc->b_portal);
         }
 
-        CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
-               bulk->b_buflen, bulk->b_xid, bulk->b_portal);
         RETURN(0);
 
-        // XXX Confirm that this is safe!
  cleanup:
-        PtlMEUnlink(bulk->b_me_h);
+        ptlrpc_abort_bulk(desc);
+
         return rc;
 }
 
-int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
 {
-        int rc;
+        struct list_head *tmp, *next;
 
-        rc = PtlMEUnlink(bulk->b_me_h);
-        if (rc != PTL_OK)
-                CERROR("PtlMEUnlink failed: %d\n", rc);
+        list_for_each_safe(tmp, next, &desc->b_page_list) {
+                struct ptlrpc_bulk_page *bulk;
+                bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
 
-        return rc;
+                /* This should be safe: these handles are initialized to be
+                 * invalid in ptlrpc_prep_bulk_page() */
+                PtlMDUnlink(bulk->b_md_h);
+                PtlMEUnlink(bulk->b_me_h);
+        }
+
+        return 0;
 }
 
 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
index f42ddb7..410bb3c 100755 (executable)
@@ -13,10 +13,10 @@ setup_lustre
 echo -n "Hit return to continue..."
 read
 
-new_fs ext2 /tmp/ost 10000
+new_fs ext2 /tmp/ost 10001
 OST=$LOOPDEV
 MDSFS=ext3
-new_fs ${MDSFS} /tmp/mds 10000
+new_fs ${MDSFS} /tmp/mds 10001
 MDS=$LOOPDEV
 
 echo 0xffffffff > /proc/sys/portals/debug