#ifndef __LUSTRE_IDL_H__
#define __LUSTRE_IDL_H__
-#ifdef __KERNEL__
-#include <linux/ioctl.h>
-#include <asm/types.h>
-#include <linux/types.h>
+#ifdef __KERNEL__
+# include <linux/ioctl.h>
+# include <asm/types.h>
+# include <linux/types.h>
#else
-#define __KERNEL__
-#include <linux/list.h>
-#undef __KERNEL__
-#include <stdint.h>
+# define __KERNEL__
+# include <linux/list.h>
+# undef __KERNEL__
+# include <stdint.h>
#endif
/*
* this file contains all data structures used in Lustre interfaces:
__u32 buflens[0];
};
+struct niobuf_remote {
+ __u64 offset;
+ __u32 len;
+ __u32 xid;
+ __u32 flags;
+};
+
+struct niobuf_local {
+ __u64 addr;
+ __u64 offset;
+ __u32 len;
+ __u32 xid;
+ void *page;
+};
+
/*
* OST requests: OBDO & OBD request records
*/
__u32 f_type;
};
-struct niobuf {
- __u64 addr;
- __u64 offset;
- __u32 len;
- __u32 flags;
- __u32 xid;
- void *page;
-};
-
struct mds_body {
struct ll_fid fid1;
struct ll_fid fid2;
struct ptlrpc_client *rq_client;
};
-struct ptlrpc_bulk_desc {
- int b_flags;
- struct ptlrpc_connection *b_connection;
- __u32 b_portal;
+struct ptlrpc_bulk_page {
+ struct ptlrpc_bulk_desc *b_desc;
+ struct list_head b_link;
char *b_buf;
int b_buflen;
- int (*b_cb)(struct ptlrpc_bulk_desc *, void *);
struct page *b_page;
- struct obd_conn b_conn;
__u32 b_xid;
-
- wait_queue_head_t b_waitq;
+ int (*b_cb)(struct ptlrpc_bulk_page *);
ptl_md_t b_md;
ptl_handle_md_t b_md_h;
ptl_handle_me_t b_me_h;
};
+struct ptlrpc_bulk_desc {
+ int b_flags;
+ struct ptlrpc_connection *b_connection;
+ __u32 b_portal;
+ int (*b_cb)(struct ptlrpc_bulk_desc *);
+ struct obd_conn b_conn;
+
+ wait_queue_head_t b_waitq;
+ struct list_head b_page_list;
+ __u32 b_page_count;
+ __u32 b_finished_count;
+};
+
struct ptlrpc_thread {
struct list_head t_link;
void ptlrpc_cleanup_connection(void);
/* rpc/niobuf.c */
-int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *);
-int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal);
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *);
int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
struct ptlrpc_connection *u, int opcode,
int count, int *lengths, char **bufs);
-void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
void ptlrpc_free_req(struct ptlrpc_request *request);
void ptlrpc_req_finished(struct ptlrpc_request *request);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *);
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
+struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc);
+void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page);
int ptlrpc_check_status(struct ptlrpc_request *req, int err);
/* rpc/service.c */
struct ptlrpc_connection *osc_conn;
};
+struct lov_obd {
+ int lov_count;
+ struct obd_conn *lov_targets;
+};
+
/* corresponds to one of the obd's */
#define MAX_MULTI 16
struct obd_device {
struct echo_obd echo;
struct recovd_obd recovd;
struct trace_obd trace;
+ struct lov_obd lov;
#if 0
struct raid1_obd raid1;
struct snap_obd snap;
obd_id *startid, obd_gr group, void *data);
int (*o_preprw)(int cmd, struct obd_conn *conn,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf *nb,
- struct niobuf *res);
+ int niocount, struct niobuf_remote *remote,
+ struct niobuf_local *local);
int (*o_commitrw)(int cmd, struct obd_conn *conn,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf *res);
+ int niocount, struct niobuf_local *local);
int (*o_enqueue)(struct obd_conn *conn, struct ldlm_namespace *ns,
struct ldlm_handle *parent_lock, __u64 *res_id,
__u32 type, struct ldlm_extent *, __u32 mode,
static inline int obd_preprw(int cmd, struct obd_conn *conn,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf *nb,
- struct niobuf *res)
+ int niocount, struct niobuf_remote *remote,
+ struct niobuf_local *local)
{
int rc;
OBD_CHECK_SETUP(conn);
OBD_CHECK_OP(conn, preprw);
- rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount, nb,
- res);
+ rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount,
+ remote, local);
RETURN(rc);
}
static inline int obd_commitrw(int cmd, struct obd_conn *conn,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf *res)
+ int niocount, struct niobuf_local *local)
{
int rc;
OBD_CHECK_SETUP(conn);
OBD_CHECK_OP(conn, commitrw);
rc = OBP(conn->oc_dev, commitrw)(cmd, conn, objcount, obj, niocount,
- res);
+ local);
RETURN(rc);
}
/* ext2_obd.c */
extern struct obd_ops ext2_obd_ops;
-
+#include <linux/fs.h>
#include <linux/ext2_fs.h>
/* super.c */
#define LUSTRE_OSC_NAME "osc"
/* ost/ost_pack.c */
-void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len,
- __u32 flags, __u32 xid);
-void ost_unpack_niobuf(void **tmp, struct niobuf **nbp);
+void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
+ __u32 xid);
+void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp);
void ost_pack_ioo(void **tmp, struct obdo *oa, int bufcnt);
void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop);
return rc;
}
-/* Must be called without the resource lock held. */
+/* Must be called without the resource lock held. Returns a referenced,
+ * unlocked ldlm_lock. */
ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id, __u32 type,
lock = ldlm_handle2object(lockh);
- spin_unlock(&lock->l_lock);
req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
if (!req)
GOTO(out, rc = -ENOMEM);
*tmp = c + sizeof(*ioo);
}
-void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len,
- __u32 flags, __u32 xid)
+void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
+ __u32 xid)
{
- struct niobuf *nb = *tmp;
+ struct niobuf_remote *nb = *tmp;
char *c = *tmp;
- nb->addr = HTON__u64((__u64)(unsigned long)addr);
nb->offset = HTON__u64(offset);
nb->len = HTON__u32(len);
nb->flags = HTON__u32(flags);
*tmp = c + sizeof(*nb);
}
-void ost_unpack_niobuf(void **tmp, struct niobuf **nbp)
+void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp)
{
char *c = *tmp;
- struct niobuf *nb = *tmp;
+ struct niobuf_remote *nb = *tmp;
*nbp = *tmp;
- nb->addr = NTOH__u64(nb->addr);
nb->offset = NTOH__u64(nb->offset);
nb->len = NTOH__u32(nb->len);
nb->flags = NTOH__u32(nb->flags);
loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
-#if 0
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ldlm_extent extent;
struct ldlm_handle lockh;
__u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
int flags = 0;
ldlm_error_t err;
-#endif
ssize_t retval;
ENTRY;
-#if 0
extent.start = *ppos;
extent.end = *ppos + count;
CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
if (err != ELDLM_OK)
CERROR("lock enqueue: err: %d\n", err);
ldlm_lock_dump((void *)(unsigned long)lockh.addr);
-#endif
CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n",
inode->i_ino, count, *ppos);
ll_setattr(filp->f_dentry, &attr);
}
-#if 0
err = obd_cancel(&sbi->ll_conn, LCK_PR, &lockh);
if (err != ELDLM_OK)
CERROR("lock cancel: err: %d\n", err);
-#endif
RETURN(retval);
}
ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
{
struct inode *inode = file->f_dentry->d_inode;
-#if 0
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ldlm_extent extent;
struct ldlm_handle lockh;
__u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
int flags = 0;
ldlm_error_t err;
-#endif
ssize_t retval;
ENTRY;
-#if 0
extent.start = *ppos;
extent.end = *ppos + count;
CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
if (err != ELDLM_OK)
CERROR("lock enqueue: err: %d\n", err);
ldlm_lock_dump((void *)(unsigned long)lockh.addr);
-#endif
CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n",
inode->i_ino, (long)count, *ppos);
retval = generic_file_write(file, buf, count, ppos);
-
-#if 0
err = obd_cancel(&sbi->ll_conn, LCK_PW, &lockh);
if (err != ELDLM_OK)
CERROR("lock cancel: err: %d\n", err);
-#endif
RETURN(retval);
}
out:
ptlrpc_free_req(request);
return inode;
-} /* ll_new_inode */
+}
int ll_mdc_unlink(struct inode *dir, struct inode *child,
const char *name, int len)
level = LUSTRE_CONN_FULL;
resend:
rc = mdc_reint(cl, req, level);
- if (rc == -ERESTARTSYS ) {
- struct mds_update_record_hdr *hdr = lustre_msg_buf(req->rq_reqmsg, 0);
+ if (rc == -ERESTARTSYS) {
+ struct mds_update_record_hdr *hdr =
+ lustre_msg_buf(req->rq_reqmsg, 0);
level = LUSTRE_CONN_RECOVD;
- CERROR("Lost reply: re-create rep.\n");
+ CERROR("Lost reply: re-create rep.\n");
req->rq_flags = 0;
hdr->ur_opcode = NTOH__u32(REINT_RECREATE);
goto resend;
struct ptlrpc_request **request)
{
struct ptlrpc_request *req = NULL;
- struct ptlrpc_bulk_desc *bulk = NULL;
- struct niobuf niobuf;
+ struct ptlrpc_bulk_desc *desc = NULL;
+ struct ptlrpc_bulk_page *bulk = NULL;
struct mds_body *body;
- int rc, size[2] = {sizeof(*body), sizeof(struct niobuf)};
- char *bufs[2] = {NULL, (char *)&niobuf};
-
- niobuf.addr = (__u64) (long) addr;
+ int rc, size = sizeof(*body);
+ ENTRY;
CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
- bulk = ptlrpc_prep_bulk(conn);
- if (bulk == NULL) {
- CERROR("%s: cannot init bulk desc\n", __FUNCTION__);
- rc = -ENOMEM;
- goto out;
- }
+ desc = ptlrpc_prep_bulk(conn);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
- req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 2, size, bufs);
+ req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 1, &size, NULL);
if (!req)
- GOTO(out, rc = -ENOMEM);
+ GOTO(out2, rc = -ENOMEM);
+ bulk = ptlrpc_prep_bulk_page(desc);
bulk->b_buflen = PAGE_SIZE;
- bulk->b_buf = (void *)(long)niobuf.addr;
- bulk->b_portal = MDS_BULK_PORTAL;
+ bulk->b_buf = addr;
bulk->b_xid = req->rq_reqmsg->xid;
+ desc->b_portal = MDS_BULK_PORTAL;
- rc = ptlrpc_register_bulk(bulk);
+ rc = ptlrpc_register_bulk(desc);
if (rc) {
CERROR("couldn't setup bulk sink: error %d.\n", rc);
- GOTO(out, rc);
+ GOTO(out2, rc);
}
body = lustre_msg_buf(req->rq_reqmsg, 0);
body->fid1.f_type = type;
body->size = offset;
- req->rq_replen = lustre_msg_size(1, size);
+ req->rq_replen = lustre_msg_size(1, &size);
req->rq_level = LUSTRE_CONN_FULL;
rc = ptlrpc_queue_wait(req);
if (rc) {
CERROR("error in handling %d\n", rc);
- ptlrpc_abort_bulk(bulk);
- GOTO(out, rc);
- }
+ ptlrpc_abort_bulk(desc);
+ } else
+ mds_unpack_rep_body(req);
- mds_unpack_rep_body(req);
EXIT;
-
+ out2:
+ ptlrpc_free_bulk(desc);
out:
*request = req;
- ptlrpc_free_bulk(bulk);
return rc;
}
#include <linux/module.h>
#include <linux/lustre_mds.h>
-int mds_sendpage(struct ptlrpc_request *req, struct file *file,
- __u64 offset, struct niobuf *dst)
+int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
{
int rc = 0;
mm_segment_t oldfs = get_fs();
- struct ptlrpc_bulk_desc *bulk;
+ struct ptlrpc_bulk_desc *desc;
+ struct ptlrpc_bulk_page *bulk;
char *buf;
+ ENTRY;
- bulk = ptlrpc_prep_bulk(req->rq_connection);
- if (bulk == NULL) {
- rc = -ENOMEM;
- GOTO(out, rc);
- }
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
- bulk->b_xid = req->rq_reqmsg->xid;
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (bulk == NULL)
+ GOTO(cleanup_bulk, rc = -ENOMEM);
OBD_ALLOC(buf, PAGE_SIZE);
- if (!buf) {
- rc = -ENOMEM;
- GOTO(cleanup_bulk, rc);
- }
+ if (buf == NULL)
+ GOTO(cleanup_bulk, rc = -ENOMEM);
set_fs(KERNEL_DS);
rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
(loff_t *)&offset);
set_fs(oldfs);
- if (rc != PAGE_SIZE) {
- rc = -EIO;
- GOTO(cleanup_buf, rc);
- }
+ if (rc != PAGE_SIZE)
+ GOTO(cleanup_buf, rc = -EIO);
+ bulk->b_xid = req->rq_reqmsg->xid;
bulk->b_buf = buf;
bulk->b_buflen = PAGE_SIZE;
+ desc->b_portal = MDS_BULK_PORTAL;
- rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
+ rc = ptlrpc_send_bulk(desc);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
OBD_FAIL_MDS_SENDPAGE, rc);
- PtlMDUnlink(bulk->b_md_h);
- GOTO(cleanup_buf, rc);
- }
- wait_event_interruptible(bulk->b_waitq,
- ptlrpc_check_bulk_sent(bulk));
-
- if (bulk->b_flags & PTL_RPC_FL_INTR) {
- rc = -EINTR;
+ ptlrpc_abort_bulk(desc);
GOTO(cleanup_buf, rc);
}
cleanup_buf:
OBD_FREE(buf, PAGE_SIZE);
cleanup_bulk:
- ptlrpc_free_bulk(bulk);
+ ptlrpc_free_bulk(desc);
out:
return rc;
}
struct vfsmount *mnt;
struct dentry *de;
struct file *file;
- struct niobuf *niobuf;
struct mds_body *body;
int rc, size = sizeof(*body);
ENTRY;
RETURN(0);
}
- niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
- if (!niobuf) {
- req->rq_status = -EINVAL;
- LBUG();
- RETURN(0);
- }
-
/* to make this asynchronous make sure that the handling function
doesn't send a reply when this function completes. Instead a
callback function would send the reply */
- rc = mds_sendpage(req, file, body->size, niobuf);
+ rc = mds_sendpage(req, file, body->size);
filp_close(file, 0);
req->rq_status = rc;
*
*/
-#include <linux/mm.h>
-#include <linux/pagemap.h>
-#include <linux/fs.h>
-#include <linux/sched.h>
-#include <asm/uaccess.h>
-
#define DEBUG_SUBSYSTEM S_CLASS
-#include <linux/obd_support.h>
#include <linux/obd_class.h>
extern struct obd_device obd_dev[MAX_OBD_DEVICES];
sizeof(struct obdo),
0, SLAB_HWCACHE_ALIGN,
NULL, NULL);
- if (obdo_cachep == NULL) {
- EXIT;
- return -ENOMEM;
- } else {
+ if (obdo_cachep == NULL)
+ RETURN(-ENOMEM);
+ else
CDEBUG(D_CACHE, "allocated cache at %p\n", obdo_cachep);
- }
} else {
CDEBUG(D_CACHE, "using existing cache at %p\n", obdo_cachep);
}
- EXIT;
- return 0;
+ RETURN(0);
}
void obd_cleanup_obdo_cache(void)
lh = next = &obddev->obd_gen_clients;
while ((lh = lh->next) != &obddev->obd_gen_clients) {
cli = list_entry(lh, struct obd_client, cli_chain);
-
+
if (cli->cli_id == conn->oc_id)
return cli;
}
return NULL;
-} /* obd_client */
+} /* gen_client */
-/* a connection defines a context in which preallocation can be managed. */
+/* a connection defines a context in which preallocation can be managed. */
int gen_connect (struct obd_conn *conn)
{
struct obd_client * cli;
CDEBUG(D_INFO, "connect: new ID %u\n", cli->cli_id);
conn->oc_id = cli->cli_id;
return 0;
-} /* gen_obd_connect */
+} /* gen_connect */
int gen_disconnect(struct obd_conn *conn)
if (!(cli = gen_client(conn))) {
CDEBUG(D_IOCTL, "disconnect: attempting to free "
"nonexistent client %u\n", conn->oc_id);
- return -EINVAL;
+ RETURN(-EINVAL);
}
CDEBUG(D_INFO, "disconnect: ID %u\n", conn->oc_id);
- EXIT;
- return 0;
+ RETURN(0);
} /* gen_obd_disconnect */
-
-/*
- * raid1 defines a number of connections to child devices,
- * used to make calls to these devices.
- * data holds nothing
- */
+/* FIXME: Data is a space- or comma-separated list of device IDs. This will
+ * have to change. */
int gen_multi_setup(struct obd_device *obddev, uint32_t len, void *data)
{
- int i;
+ int count, rc;
+ char *p;
+ ENTRY;
- for (i = 0 ; i < obddev->obd_multi_count ; i++ ) {
- int rc;
- struct obd_conn *ch_conn = &obddev->obd_multi_conn[i];
- rc = OBP(ch_conn->oc_dev, connect)(ch_conn);
+ for (p = data, count = 0; p < (char *)data + len; count++) {
+ char *end;
+ int tmp = simple_strtoul(p, &end, 0);
- if ( rc != 0 ) {
- int j;
+ if (p == end) {
+ CERROR("invalid device ID starting at: %s\n", p);
+ GOTO(err_disconnect, rc = -EINVAL);
+ }
- for (j = --i; j >= 0; --j) {
- ch_conn = &obddev->obd_multi_conn[i];
- OBP(ch_conn->oc_dev, disconnect)(ch_conn);
- }
- return -EINVAL;
+ obddev->obd_multi_conn[count].oc_dev = &obd_dev[tmp];
+ rc = obd_connect(&obddev->obd_multi_conn[count]);
+ if (rc) {
+ CERROR("cannot connect to device %d: rc = %d\n", tmp,
+ rc);
+ GOTO(err_disconnect, rc);
}
- }
- return 0;
-}
+ CDEBUG(D_INFO, "target OBD %d is of type %s\n", count,
+ obd_dev[tmp].obd_type->typ_name);
-#if 0
-int gen_multi_attach(struct obd_device *obddev, int len, void *data)
-{
- int i;
- int count;
- struct obd_device *rdev = obddev->obd_multi_dev[0];
+ p = end + 1;
+ }
- count = len/sizeof(int);
obddev->obd_multi_count = count;
- for (i=0 ; i<count ; i++) {
- rdev = &obd_dev[*((int *)data + i)];
- rdev = rdev + 1;
- CDEBUG(D_INFO, "OBD RAID1: replicator %d is of type %s\n", i,
- (rdev + i)->obd_type->typ_name);
- }
- return 0;
-}
-#endif
+ RETURN(0);
+
+ err_disconnect:
+ for (count--; count >= 0; count--)
+ obd_disconnect(&obddev->obd_multi_conn[count]);
+ return rc;
+}
/*
* remove all connections to this device
{
int i;
- for (i = 0 ; i < obddev->obd_multi_count ; i++ ) {
- struct obd_conn *ch_conn = &obddev->obd_multi_conn[i];
- int rc;
- rc = OBP(ch_conn->oc_dev, disconnect)(ch_conn);
-
- if ( rc != 0 ) {
+ for (i = 0; i < obddev->obd_multi_count; i++) {
+ int rc = obd_disconnect(&obddev->obd_multi_conn[i]);
+ if (rc)
CERROR("disconnect failure %d\n",
- ch_conn->oc_dev->obd_minor);
- }
- }
+ obddev->obd_multi_conn[i].oc_dev->obd_minor);
+ }
return 0;
-} /* gen_multi_cleanup_device */
+}
/*
int err = 0;
ENTRY;
- CDEBUG(D_INFO, "src: ino %Ld blocks %Ld, size %Ld, dst: ino %Ld\n",
+ CDEBUG(D_INFO, "src: ino %Ld blocks %Ld, size %Ld, dst: ino %Ld\n",
(unsigned long long)src->o_id, (unsigned long long)src->o_blocks,
(unsigned long long)src->o_size, (unsigned long long)dst->o_id);
page = alloc_page(GFP_USER);
- if ( !page ) {
- EXIT;
- return -ENOMEM;
- }
-
+ if (page == NULL)
+ RETURN(-ENOMEM);
+
lck_page(page);
-
+
/* XXX with brw vector I/O, we could batch up reads and writes here,
* all we need to do is allocate multiple pages to handle the I/Os
* and arrays to handle the request parameters.
obd_off brw_offset = (page->index) << PAGE_SHIFT;
obd_flag flagr = 0;
obd_flag flagw = OBD_BRW_CREATE;
-
+
page->index = index;
err = OBP(src_conn->oc_dev, brw)(READ, src_conn, num_oa, &src,
&num_buf, &page, &brw_count,
}
CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
-
+
index++;
}
dst->o_size = src->o_size;
UnlockPage(page);
__free_page(page);
- EXIT;
- return err;
+ RETURN(err);
}
}
int echo_preprw(int cmd, struct obd_conn *conn, int objcount,
- struct obd_ioobj *obj, int niocount, struct niobuf *nb,
- struct niobuf *res)
+ struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
+ struct niobuf_local *res)
{
- struct niobuf *r = res;
+ struct niobuf_local *r = res;
int rc = 0;
int i;
CERROR("can't get new page %d/%d for id %Ld\n",
j, obj->ioo_bufcnt,
(unsigned long long)obj->ioo_id);
- rc = -ENOMEM;
- EXIT;
- goto preprw_cleanup;
+ GOTO(preprw_cleanup, rc = -ENOMEM);
}
echo_pages++;
}
CDEBUG(D_PAGE, "%ld pages allocated after prep\n", echo_pages);
- EXIT;
- return 0;
+ RETURN(0);
preprw_cleanup:
/* It is possible that we would rather handle errors by allow
}
int echo_commitrw(int cmd, struct obd_conn *conn, int objcount,
- struct obd_ioobj *obj, int niocount, struct niobuf *res)
+ struct obd_ioobj *obj, int niocount, struct niobuf_local *res)
{
- struct niobuf *r = res;
+ struct niobuf_local *r = res;
int rc = 0;
int i;
-
ENTRY;
CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
if (niocount && !r) {
CERROR("NULL res niobuf with niocount %d\n", niocount);
- EXIT;
- return -EINVAL;
+ RETURN(-EINVAL);
}
for (i = 0; i < objcount; i++, obj++) {
CERROR("bad page %p, id %Ld (%d), buf %d/%d\n",
page, (unsigned long long)obj->ioo_id, i,
j, obj->ioo_bufcnt);
- rc = -EFAULT;
- EXIT;
- goto commitrw_cleanup;
+ GOTO(commitrw_cleanup, rc = -EFAULT);
}
free_pages(addr, 0);
}
}
CDEBUG(D_PAGE, "%ld pages remain after commit\n", echo_pages);
- EXIT;
- return 0;
+ RETURN(0);
commitrw_cleanup:
CERROR("cleaning up %ld pages (%d obdos)\n",
*/
#define EXPORT_SYMTAB
-
-#include <linux/version.h>
-#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/ext2_fs.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-
#define DEBUG_SUBSYSTEM S_FILTER
-#include <linux/obd_class.h>
+#include <linux/module.h>
#include <linux/obd_ext2.h>
#include <linux/obd_filter.h>
#define S_SHIFT 12
static char * obd_type_by_mode[S_IFMT >> S_SHIFT] = {
[0] "",
- [S_IFREG >> S_SHIFT] "R",
+ [S_IFREG >> S_SHIFT] "R",
[S_IFDIR >> S_SHIFT] "D",
[S_IFCHR >> S_SHIFT] "C",
- [S_IFBLK >> S_SHIFT] "B",
- [S_IFIFO >> S_SHIFT] "F",
+ [S_IFBLK >> S_SHIFT] "B",
+ [S_IFIFO >> S_SHIFT] "F",
[S_IFSOCK >> S_SHIFT] "S",
[S_IFLNK >> S_SHIFT] "L"
};
filter_id(rootid, FILTER_ROOTINO, S_IFDIR);
file = filp_open(rootid, O_RDWR | O_CREAT, 00755);
if (IS_ERR(file)) {
- CERROR("OBD filter: cannot make root directory");
+ CERROR("OBD filter: cannot make root directory");
GOTO(out, rc = PTR_ERR(file));
}
filp_close(file, 0);
}
}
obddev->u.filter.fo_lastino = lastino;
- filp_close(file, 0);
+ filp_close(file, 0);
rc = 0;
out:
struct obd_run_ctxt saved;
long rc;
struct file *file;
- loff_t off = 0;
+ loff_t off = 0;
push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
file = filp_open("D/status", O_RDWR | O_CREAT, 0700);
- if ( !file || IS_ERR(file)) {
+ if ( !file || IS_ERR(file)) {
CERROR("OBD filter: cannot create status file\n");
goto out;
}
- rc = file->f_op->write(file, (char *)&obddev->u.filter.fo_lastino,
+ rc = file->f_op->write(file, (char *)&obddev->u.filter.fo_lastino,
sizeof(obddev->u.filter.fo_lastino), &off);
- if (rc != sizeof(obddev->u.filter.fo_lastino) ) {
+ if (rc != sizeof(obddev->u.filter.fo_lastino) ) {
CERROR("OBD filter: error writing lastino\n");
}
- rc = filp_close(file, NULL);
- if (rc) {
+ rc = filp_close(file, NULL);
+ if (rc) {
CERROR("OBD filter: cannot close status file\n");
}
out:
}
-static __u64 filter_next_id(struct obd_device *obddev)
+static __u64 filter_next_id(struct obd_device *obddev)
{
__u64 id;
spin_lock(&obddev->u.filter.fo_lock);
}
/* how to get files, dentries, inodes from object id's */
-static struct file *filter_obj_open(struct obd_device *obddev,
+static struct file *filter_obj_open(struct obd_device *obddev,
__u64 id, __u32 type)
{
struct obd_run_ctxt saved;
RETURN(NULL);
}
- if ( ! (type & S_IFMT) ) {
- CERROR("OBD filter_obj_open, no type (%Ld), mode %o!\n",
+ if ( ! (type & S_IFMT) ) {
+ CERROR("OBD filter_obj_open, no type (%Ld), mode %o!\n",
(unsigned long long)id, type);
}
- filter_id(name, id, type);
+ filter_id(name, id, type);
push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
file = filp_open(name, O_RDONLY | O_LARGEFILE, 0);
pop_ctxt(&saved);
sprintf(path, "O/%s", obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT]);
- file = filp_open(path, O_RDONLY, 0);
+ file = filp_open(path, O_RDONLY, 0);
return file;
}
-static struct inode *filter_inode_from_obj(struct obd_device *obddev,
+static struct inode *filter_inode_from_obj(struct obd_device *obddev,
__u64 id, __u32 type)
{
struct file *file;
- struct inode *inode;
+ struct inode *inode;
file = filter_obj_open(obddev, id, type);
- if ( !file ) {
- CERROR("filter_inode_from_obdo failed\n");
+ if ( !file ) {
+ CERROR("filter_inode_from_obdo failed\n");
return NULL;
}
- inode = iget(file->f_dentry->d_inode->i_sb,
- file->f_dentry->d_inode->i_ino);
+ inode = iget(file->f_dentry->d_inode->i_sb,
+ file->f_dentry->d_inode->i_ino);
filp_close(file, 0);
return inode;
}
ENTRY;
- if ( !(obddev->obd_flags & OBD_SET_UP) )
- RETURN(0);
-
if ( !list_empty(&obddev->obd_gen_clients) ) {
CERROR("still has clients!\n");
RETURN(-EBUSY);
filter_post(obddev);
unlock_kernel();
- mntput(obddev->u.filter.fo_vfsmnt);
+ mntput(obddev->u.filter.fo_vfsmnt);
obddev->u.filter.fo_sb = 0;
kfree(obddev->u.filter.fo_fstype);
static int filter_getattr(struct obd_conn *conn, struct obdo *oa)
{
struct inode *inode;
-
ENTRY;
+
if ( !gen_client(conn) ) {
CDEBUG(D_IOCTL, "fatal: invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
+ RETURN(-EINVAL);
}
- if ( !(inode = filter_inode_from_obj(conn->oc_dev,
- oa->o_id, oa->o_mode)) ) {
- EXIT;
- return -ENOENT;
- }
+ inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode);
+ if (inode == NULL)
+ RETURN(-ENOENT);
oa->o_valid &= ~OBD_MD_FLID;
filter_from_inode(oa, inode);
-
+
iput(inode);
- EXIT;
- return 0;
-}
+ RETURN(0);
+}
static int filter_setattr(struct obd_conn *conn, struct obdo *oa)
{
struct iattr iattr;
int rc;
struct dentry de;
+ ENTRY;
if (!gen_client(conn)) {
CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
- return -EINVAL;
+ RETURN(-EINVAL);
}
- inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode);
- if ( !inode ) {
- EXIT;
- return -ENOENT;
- }
+ inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode);
+ if ( !inode )
+ RETURN(-ENOENT);
iattr_from_obdo(&iattr, oa);
iattr.ia_mode &= ~S_IFMT;
iattr.ia_mode |= S_IFREG;
de.d_inode = inode;
- if ( inode->i_op->setattr ) {
+ if ( inode->i_op->setattr )
rc = inode->i_op->setattr(&de, &iattr);
- } else {
+ else
rc = inode_setattr(inode, &iattr);
- }
iput(inode);
- EXIT;
- return rc;
+ RETURN(rc);
}
static int filter_open(struct obd_conn *conn, struct obdo *oa)
}
oa->o_id = filter_next_id(conn->oc_dev);
- if ( !(oa->o_mode && S_IFMT) ) {
+ if ( !(oa->o_mode && S_IFMT) ) {
CERROR("filter obd: no type!\n");
return -ENOENT;
}
push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
mode = oa->o_mode;
mode &= ~S_IFMT;
- mode |= S_IFREG;
+ mode |= S_IFREG;
file = filp_open(name, O_RDONLY | O_CREAT, mode);
pop_ctxt(&saved);
- if (IS_ERR(file)) {
+ if (IS_ERR(file)) {
CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(file));
return -ENOENT;
}
filp_close(file, 0);
-
+
/* Set flags for fields we have set in ext2_new_inode */
oa->o_valid |= OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME |
struct file *object;
int rc;
struct obd_run_ctxt saved;
+ ENTRY;
if (!(cli = gen_client(conn))) {
CERROR("invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
+ RETURN(-EINVAL);
}
obddev = conn->oc_dev;
object = filter_obj_open(obddev, oa->o_id, oa->o_mode);
- if (!object || IS_ERR(object)) {
- EXIT;
- return -ENOENT;
- }
-
+ if (!object || IS_ERR(object))
+ RETURN(-ENOENT);
+
inode = object->f_dentry->d_inode;
inode->i_nlink = 1;
inode->i_mode = 010000;
push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
dir = filter_parent(oa->o_id, oa->o_mode);
- if (IS_ERR(dir)) {
- rc = PTR_ERR(dir);
- EXIT;
- goto out;
- }
- dget(dir->f_dentry);
+ if (IS_ERR(dir))
+ GOTO(out, rc = PTR_ERR(dir));
+ dget(dir->f_dentry);
dget(object->f_dentry);
rc = vfs_unlink(dir->f_dentry->d_inode, object->f_dentry);
filp_close(dir, 0);
filp_close(object, 0);
+ EXIT;
out:
pop_ctxt(&saved);
- EXIT;
return rc;
}
-static int filter_truncate(struct obd_conn *conn, struct obdo *oa, obd_size count,
- obd_off offset)
+static int filter_truncate(struct obd_conn *conn, struct obdo *oa,
+ obd_size count, obd_off offset)
{
int error;
+ ENTRY;
error = filter_setattr(conn, oa);
oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
- EXIT;
- return error;
+ RETURN(error);
}
/* buffer must lie in user memory here */
struct file * file;
unsigned long retval;
int err;
+ ENTRY;
if (!gen_client(conn)) {
CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
+ RETURN(-EINVAL);
}
- file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
- if (!file || IS_ERR(file)) {
- EXIT;
- return -PTR_ERR(file);
- }
+ file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
+ if (!file || IS_ERR(file))
+ RETURN(-PTR_ERR(file));
/* count doubles as retval */
retval = file->f_op->read(file, buf, *count, (loff_t *)&offset);
/* buffer must lie in user memory here */
-static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf,
+static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf,
obd_size *count, obd_off offset)
{
int err;
struct file * file;
unsigned long retval;
-
ENTRY;
+
if (!gen_client(conn)) {
CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
+ RETURN(-EINVAL);
}
- file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
- if (!file || IS_ERR(file)) {
- EXIT;
- return -PTR_ERR(file);
- }
+ file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
+ if (!file || IS_ERR(file))
+ RETURN(-PTR_ERR(file));
/* count doubles as retval */
retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
return err;
} /* filter_write */
-static int filter_pgcache_brw(int rw, struct obd_conn *conn,
+static int filter_pgcache_brw(int rw, struct obd_conn *conn,
obd_count num_oa,
- struct obdo **oa,
- obd_count *oa_bufs,
+ struct obdo **oa,
+ obd_count *oa_bufs,
struct page **pages,
- obd_size *count,
- obd_off *offset,
+ obd_size *count,
+ obd_off *offset,
obd_flag *flags)
{
struct super_block *sb;
unsigned long retval;
int error;
struct file *file;
-
ENTRY;
if (!gen_client(conn)) {
CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
+ RETURN(-EINVAL);
}
sb = conn->oc_dev->u.filter.fo_sb;
oldfs = get_fs();
- set_fs(KERNEL_DS);
+ set_fs(KERNEL_DS);
pnum = 0; /* pnum indexes buf 0..num_pages */
for (onum = 0; onum < num_oa; onum++) {
- int pg;
-
- file = filter_obj_open(conn->oc_dev, oa[onum]->o_id,
- oa[onum]->o_mode);
- if (!file || IS_ERR(file)) {
- EXIT;
- error = -ENOENT;
- goto ERROR;
- }
+ int pg;
+
+ file = filter_obj_open(conn->oc_dev, oa[onum]->o_id,
+ oa[onum]->o_mode);
+ if (!file || IS_ERR(file))
+ GOTO(ERROR, error = -ENOENT);
/* count doubles as retval */
for (pg = 0; pg < oa_bufs[onum]; pg++) {
CDEBUG(D_INODE, "OP %d obdo no/pno: (%d,%d) (%ld,%ld) "
- "off count (%Ld,%Ld)\n",
+ "off count (%Ld,%Ld)\n",
rw, onum, pnum, file->f_dentry->d_inode->i_ino,
(unsigned long)offset[pnum] >> PAGE_CACHE_SHIFT,
(unsigned long long)offset[pnum],
(unsigned long long)count[pnum]);
- if (rw == WRITE) {
- loff_t off;
+ if (rw == WRITE) {
+ loff_t off;
char *buffer;
- off = offset[pnum];
- buffer = kmap(pages[pnum]);
+ off = offset[pnum];
+ buffer = kmap(pages[pnum]);
retval = file->f_op->write(file, buffer, count[pnum], &off);
kunmap(pages[pnum]);
- CDEBUG(D_INODE, "retval %ld\n", retval);
- } else {
- loff_t off = offset[pnum];
+ CDEBUG(D_INODE, "retval %ld\n", retval);
+ } else {
+ loff_t off = offset[pnum];
char *buffer = kmap(pages[pnum]);
if (off >= file->f_dentry->d_inode->i_size) {
retval = count[pnum];
} else {
retval = file->f_op->read(file, buffer, count[pnum], &off);
- }
+ }
kunmap(pages[pnum]);
if ( retval != count[pnum] ) {
filp_close(file, 0);
- retval = -EIO;
- EXIT;
- goto ERROR;
+ GOTO(ERROR, retval = -EIO);
}
- CDEBUG(D_INODE, "retval %ld\n", retval);
+ CDEBUG(D_INODE, "retval %ld\n", retval);
}
pnum++;
}
/* sizes and blocks are set by generic_file_write */
- /* ctimes/mtimes will follow with a setattr call */
+ /* ctimes/mtimes will follow with a setattr call */
filp_close(file, 0);
}
-
+
EXIT;
ERROR:
set_fs(oldfs);
{
struct inode *inode = NULL;
struct super_block *sb = conn->oc_dev->u.ext2.e2_sb;
+ ENTRY;
if (!sb || !sb->s_dev) {
CDEBUG(D_SUPER, "fatal: device not initialized.\n");
- EXIT;
- return NULL;
+ RETURN(NULL);
}
if ( !o->ioo_id ) {
CDEBUG(D_INODE, "fatal: invalid obdo %lu\n", (long)o->ioo_id);
- EXIT;
- return NULL;
+ RETURN(NULL);
}
inode = filter_inode_from_obj(conn->oc_dev, o->ioo_id, S_IFREG);
"no links" : "NULL");
if (inode)
iput(inode);
- EXIT;
- return NULL;
+ RETURN(NULL);
}
- return inode;
+ RETURN(inode);
}
static int filter_preprw(int cmd, struct obd_conn *conn,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf *nb,
- struct niobuf *res)
+ int niocount, struct niobuf_remote *nb,
+ struct niobuf_local *res)
{
struct obd_ioobj *o = obj;
- struct niobuf *b = nb;
- struct niobuf *r = res;
+ struct niobuf_remote *b = nb;
+ struct niobuf_local *r = res;
int i;
ENTRY;
static int filter_commitrw(int cmd, struct obd_conn *conn,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf *res)
+ int niocount, struct niobuf_local *res)
{
struct obd_ioobj *o = obj;
- struct niobuf *r = res;
+ struct niobuf_local *r = res;
int i;
ENTRY;
{
struct super_block *sb;
int err;
-
ENTRY;
if (!gen_client(conn)) {
CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
+ RETURN(-EINVAL);
}
sb = conn->oc_dev->u.filter.fo_sb;
err = sb->s_op->statfs(sb, statfs);
- EXIT;
- return err;
+ RETURN(err);
} /* filter_statfs */
if (!(cli = gen_client(conn))) {
CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
- return -EINVAL;
+ RETURN(-EINVAL);
}
obddev = conn->oc_dev;
-
+
if ( keylen == strlen("blocksize") &&
memcmp(key, "blocksize", keylen) == 0 ) {
*vallen = sizeof(int);
*val = (void *)obddev->u.filter.fo_sb->s_blocksize;
- EXIT;
- return 0;
+ RETURN(0);
}
if ( keylen == strlen("blocksize_bits") &&
memcmp(key, "blocksize_bits", keylen) == 0 ){
*vallen = sizeof(int);
*val = (void *)(int)obddev->u.filter.fo_sb->s_blocksize_bits;
- EXIT;
- return 0;
+ RETURN(0);
}
if ( keylen == strlen("root_ino") &&
memcmp(key, "root_ino", keylen) == 0 ){
*vallen = sizeof(int);
*val = (void *)(int) FILTER_ROOTINO;
- EXIT;
- return 0;
+ RETURN(0);
}
-
+
CDEBUG(D_IOCTL, "invalid key\n");
- return -EINVAL;
+ RETURN(-EINVAL);
}
MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Filtering OBD driver v1.0");
-MODULE_LICENSE("GPL");
+MODULE_LICENSE("GPL");
module_init(obdfilter_init);
module_exit(obdfilter_exit);
return 0;
}
-int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
- struct niobuf *dst, struct niobuf *src)
+static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
+ struct niobuf_remote *dst, struct niobuf_local *src)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct ptlrpc_bulk_desc *bulk;
- int rc;
+ struct ptlrpc_bulk_page *page;
ENTRY;
- osc_con2cl(conn, &cl, &connection);
-
- bulk = ptlrpc_prep_bulk(connection);
- if (bulk == NULL)
+ page = ptlrpc_prep_bulk_page(desc);
+ if (page == NULL)
RETURN(-ENOMEM);
- bulk->b_buf = (void *)(unsigned long)src->addr;
- bulk->b_buflen = src->len;
- bulk->b_xid = dst->xid;
- rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
- if (rc != 0) {
- CERROR("send_bulk failed: %d\n", rc);
- ptlrpc_free_bulk(bulk);
- LBUG();
- RETURN(rc);
- }
- wait_event_interruptible(bulk->b_waitq, ptlrpc_check_bulk_sent(bulk));
-
- if (bulk->b_flags & PTL_RPC_FL_INTR) {
- ptlrpc_free_bulk(bulk);
- RETURN(-EINTR);
- }
+ page->b_buf = (void *)(unsigned long)src->addr;
+ page->b_buflen = src->len;
+ page->b_xid = dst->xid;
- ptlrpc_free_bulk(bulk);
RETURN(0);
}
-int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
- obd_count *oa_bufs, struct page **buf, obd_size *count,
- obd_off *offset, obd_flag *flags)
+static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
+ struct obdo **oa, obd_count *oa_bufs, struct page **buf,
+ obd_size *count, obd_off *offset, obd_flag *flags)
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
struct ptlrpc_request *request;
struct ost_body *body;
+ struct list_head *tmp, *next;
int pages, rc, i, j, size[3] = {sizeof(*body)};
void *ptr1, *ptr2;
- struct ptlrpc_bulk_desc **bulk;
+ struct ptlrpc_bulk_desc *desc;;
ENTRY;
size[1] = num_oa * sizeof(struct obd_ioobj);
pages = 0;
for (i = 0; i < num_oa; i++)
pages += oa_bufs[i];
- size[2] = pages * sizeof(struct niobuf);
-
- OBD_ALLOC(bulk, pages * sizeof(*bulk));
- if (bulk == NULL)
- RETURN(-ENOMEM);
+ size[2] = pages * sizeof(struct niobuf_remote);
osc_con2cl(conn, &cl, &connection);
request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
if (!request)
- GOTO(out, rc = -ENOMEM);
+ GOTO(out3, rc = -ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
body->data = OBD_BRW_READ;
+ desc = ptlrpc_prep_bulk(connection);
+ if (desc == NULL)
+ GOTO(out2, rc = -ENOMEM);
+ desc->b_portal = OST_BULK_PORTAL;
+
ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
for (pages = 0, i = 0; i < num_oa; i++) {
ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
for (j = 0; j < oa_bufs[i]; j++, pages++) {
- bulk[pages] = ptlrpc_prep_bulk(connection);
- if (bulk[pages] == NULL)
+ struct ptlrpc_bulk_page *page;
+ page = ptlrpc_prep_bulk_page(desc);
+ if (page == NULL)
GOTO(out, rc = -ENOMEM);
spin_lock(&connection->c_lock);
- bulk[pages]->b_xid = ++connection->c_xid_out;
+ page->b_xid = ++connection->c_xid_out;
spin_unlock(&connection->c_lock);
- bulk[pages]->b_buf = kmap(buf[pages]);
- bulk[pages]->b_buflen = PAGE_SIZE;
- bulk[pages]->b_portal = OST_BULK_PORTAL;
- ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
- offset[pages], count[pages],
- flags[pages], bulk[pages]->b_xid);
-
- rc = ptlrpc_register_bulk(bulk[pages]);
- if (rc)
- GOTO(out, rc);
+ page->b_buf = kmap(buf[pages]);
+ page->b_buflen = PAGE_SIZE;
+ ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+ flags[pages], page->b_xid);
}
}
+ rc = ptlrpc_register_bulk(desc);
+ if (rc)
+ GOTO(out, rc);
+
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
+ if (rc)
+ ptlrpc_abort_bulk(desc);
GOTO(out, rc);
out:
- /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
- * abort those bulk listeners. */
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ struct ptlrpc_bulk_page *page;
+ page = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
- for (pages = 0, i = 0; i < num_oa; i++) {
- for (j = 0; j < oa_bufs[i]; j++, pages++) {
- if (bulk[pages] == NULL)
- continue;
- kunmap(buf[pages]);
- ptlrpc_free_bulk(bulk[pages]);
- }
+ if (page->b_buf != NULL)
+ kunmap(page->b_buf);
}
- OBD_FREE(bulk, pages * sizeof(*bulk));
+ ptlrpc_free_bulk(desc);
+ out2:
ptlrpc_free_req(request);
+ out3:
return rc;
}
-int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
- obd_count *oa_bufs, struct page **buf, obd_size *count,
- obd_off *offset, obd_flag *flags)
+static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
+ struct obdo **oa, obd_count *oa_bufs,
+ struct page **buf, obd_size *count, obd_off *offset,
+ obd_flag *flags)
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
struct ptlrpc_request *request;
+ struct ptlrpc_bulk_desc *desc;
struct obd_ioobj ioo;
struct ost_body *body;
- struct niobuf *src;
+ struct niobuf_local *local;
+ struct niobuf_remote *remote;
long pages;
int rc, i, j, size[3] = {sizeof(*body)};
void *ptr1, *ptr2;
pages = 0;
for (i = 0; i < num_oa; i++)
pages += oa_bufs[i];
- size[2] = pages * sizeof(*src);
+ size[2] = pages * sizeof(*remote);
- OBD_ALLOC(src, size[2]);
- if (!src)
+ OBD_ALLOC(local, pages * sizeof(*local));
+ if (local == NULL)
RETURN(-ENOMEM);
osc_con2cl(conn, &cl, &connection);
request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
if (!request)
- RETURN(-ENOMEM);
+ GOTO(out3, rc = -ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
body->data = OBD_BRW_WRITE;
for (pages = 0, i = 0; i < num_oa; i++) {
ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
for (j = 0; j < oa_bufs[i]; j++, pages++) {
- ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
- count[pages], flags[pages], 0);
+ local[pages].addr = (__u64)(long)kmap(buf[pages]);
+ local[pages].offset = offset[pages];
+ local[pages].len = count[pages];
+ ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+ flags[pages], 0);
}
}
- memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
- size[1] = pages * sizeof(struct niobuf);
+ size[1] = pages * sizeof(struct niobuf_remote);
request->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(request);
if (rc)
- GOTO(out, rc);
+ GOTO(out2, rc);
ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
if (ptr2 == NULL)
- GOTO(out, rc = -EINVAL);
+ GOTO(out2, rc = -EINVAL);
- if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
+ if (request->rq_repmsg->buflens[1] !=
+ pages * sizeof(struct niobuf_remote)) {
CERROR("buffer length wrong (%d vs. %ld)\n",
request->rq_repmsg->buflens[1],
- pages * sizeof(struct niobuf));
- GOTO(out, rc = -EINVAL);
+ pages * sizeof(struct niobuf_remote));
+ GOTO(out2, rc = -EINVAL);
}
+ desc = ptlrpc_prep_bulk(connection);
+ desc->b_portal = OSC_BULK_PORTAL;
+
for (pages = 0, i = 0; i < num_oa; i++) {
for (j = 0; j < oa_bufs[i]; j++, pages++) {
- struct niobuf *dst;
- ost_unpack_niobuf(&ptr2, &dst);
- osc_sendpage(conn, request, dst, &src[pages]);
+ ost_unpack_niobuf(&ptr2, &remote);
+ rc = osc_sendpage(desc, remote, &local[pages]);
+ if (rc)
+ GOTO(out, rc);
}
}
- OBD_FREE(src, size[2]);
+
+ rc = ptlrpc_send_bulk(desc);
+ GOTO(out, rc);
+
out:
+ ptlrpc_free_bulk(desc);
+ out2:
+ ptlrpc_free_req(request);
for (pages = 0, i = 0; i < num_oa; i++)
for (j = 0; j < oa_bufs[i]; j++, pages++)
kunmap(buf[pages]);
+ out3:
+ OBD_FREE(local, pages * sizeof(*local));
- ptlrpc_free_req(request);
- return 0;
+ return rc;
}
-int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
- struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags)
+static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
+ struct obdo **oa, obd_count *oa_bufs, struct page **buf,
+ obd_size *count, obd_off *offset, obd_flag *flags)
{
if (rw == OBD_BRW_READ)
return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
offset, flags);
}
-int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
- struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type,
- struct ldlm_extent *extent, __u32 mode, int *flags, void *data,
- int datalen, struct ldlm_handle *lockh)
+static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
+ struct ldlm_handle *parent_lock, __u64 *res_id,
+ __u32 type, struct ldlm_extent *extent, __u32 mode,
+ int *flags, void *data, int datalen,
+ struct ldlm_handle *lockh)
{
struct ptlrpc_connection *conn;
struct ptlrpc_client *cl;
return rc;
}
-int osc_cancel(struct obd_conn *oconn, __u32 mode, struct ldlm_handle *lockh)
+static int osc_cancel(struct obd_conn *oconn, __u32 mode,
+ struct ldlm_handle *lockh)
{
struct ldlm_lock *lock;
ENTRY;
static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
{
- struct ptlrpc_bulk_desc *bulk = NULL;
+ struct ptlrpc_bulk_desc *desc;
struct obd_conn conn;
void *tmp1, *tmp2, *end2;
- struct niobuf *nb, *dst, *res = NULL;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb = NULL;
struct obd_ioobj *ioo;
struct ost_body *body;
int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
- niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = body->data;
conn.oc_id = body->connid;
ost_unpack_ioo(&tmp1, &ioo);
if (tmp2 + ioo->ioo_bufcnt > end2) {
LBUG();
- rc = -EFAULT;
- break;
- }
- for (j = 0; j < ioo->ioo_bufcnt; j++) {
- ost_unpack_niobuf(&tmp2, &nb);
+ GOTO(out, rc = -EFAULT);
}
+ for (j = 0; j < ioo->ioo_bufcnt; j++)
+ ost_unpack_niobuf(&tmp2, &remote_nb);
}
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
- OBD_ALLOC(res, sizeof(*res) * niocount);
- if (res == NULL)
+ OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
+ if (local_nb == NULL)
RETURN(-ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_preprw(cmd, &conn, objcount,
- tmp1, niocount, tmp2, res);
+ tmp1, niocount, tmp2, local_nb);
if (req->rq_status)
- GOTO(out_res, 0);
+ GOTO(out_local, 0);
- for (i = 0; i < niocount; i++) {
- bulk = ptlrpc_prep_bulk(req->rq_connection);
- if (bulk == NULL) {
- CERROR("cannot alloc bulk desc\n");
- GOTO(out_res, rc = -ENOMEM);
- }
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(out_local, rc = -ENOMEM);
+ desc->b_portal = OST_BULK_PORTAL;
- dst = &(((struct niobuf *)tmp2)[i]);
- bulk->b_xid = dst->xid;
- bulk->b_buf = (void *)(unsigned long)res[i].addr;
+ for (i = 0; i < niocount; i++) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (bulk == NULL)
+ GOTO(out_bulk, rc = -ENOMEM);
+ remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
+ bulk->b_xid = remote_nb->xid;
+ bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
bulk->b_buflen = PAGE_SIZE;
- rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
- if (rc)
- GOTO(out_bulk, rc);
- wait_event_interruptible(bulk->b_waitq,
- ptlrpc_check_bulk_sent(bulk));
+ }
- if (bulk->b_flags & PTL_RPC_FL_INTR)
- GOTO(out_bulk, 0);
+ rc = ptlrpc_send_bulk(desc);
+ if (rc)
+ GOTO(out_bulk, rc);
- ptlrpc_free_bulk(bulk);
- }
- bulk = NULL;
+ ptlrpc_free_bulk(desc);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_commitrw(cmd, &conn, objcount,
- tmp1, niocount, res);
+ tmp1, niocount, local_nb);
- EXIT;
-out_bulk:
- if (bulk != NULL)
- ptlrpc_free_bulk(bulk);
-out_res:
- if (res != NULL)
- OBD_FREE(res, sizeof(*res) * niocount);
+ RETURN(rc);
+ out_bulk:
+ ptlrpc_free_bulk(desc);
+ out_local:
+ if (local_nb != NULL)
+ OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+ out:
return 0;
}
static int ost_commit_page(struct obd_conn *conn, struct page *page)
{
struct obd_ioobj obj;
- struct niobuf buf;
+ struct niobuf_local buf;
int rc;
ENTRY;
RETURN(rc);
}
-static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data)
+static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
{
int rc;
-
ENTRY;
- rc = ost_commit_page(&bulk->b_conn, bulk->b_page);
+ rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page);
if (rc)
CERROR("ost_commit_page failed: %d\n", rc);
RETURN(rc);
}
+static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
+{
+ ptlrpc_free_bulk(desc);
+
+ return 0;
+}
+
static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
{
+ struct ptlrpc_bulk_desc *desc;
struct obd_conn conn;
- struct niobuf *nb, *res;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb, *lnb;
struct obd_ioobj *ioo;
struct ost_body *body;
int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
- niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = body->data;
conn.oc_id = body->connid;
rc = -EFAULT;
break;
}
- for (j = 0; j < ioo->ioo_bufcnt; j++) {
- ost_unpack_niobuf((void *)&tmp2, &nb);
- }
+ for (j = 0; j < ioo->ioo_bufcnt; j++)
+ ost_unpack_niobuf((void *)&tmp2, &remote_nb);
}
- size[1] = niocount * sizeof(*nb);
+ size[1] = niocount * sizeof(*remote_nb);
rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
- RETURN(rc);
+ GOTO(fail, rc);
+ remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
- res = lustre_msg_buf(req->rq_repmsg, 1);
+ OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+ if (local_nb == NULL)
+ GOTO(fail, rc = -ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_preprw(cmd, &conn, objcount,
- tmp1, niocount, tmp2, res);
-
+ tmp1, niocount, tmp2, local_nb);
if (req->rq_status)
- GOTO(out, 0);
+ GOTO(success, 0);
- for (i = 0; i < niocount; i++, res++) {
- struct ptlrpc_bulk_desc *bulk;
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(fail_preprw, rc = -ENOMEM);
+ desc->b_cb = ost_brw_write_finished_cb;
+ desc->b_portal = OSC_BULK_PORTAL;
+ memcpy(&(desc->b_conn), &conn, sizeof(conn));
+
+ for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+ struct ptlrpc_bulk_page *bulk;
- bulk = ptlrpc_prep_bulk(req->rq_connection);
+ bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
- GOTO(out, rc = -ENOMEM);
+ GOTO(fail_bulk, rc = -ENOMEM);
spin_lock(&srv->srv_lock);
bulk->b_xid = srv->srv_xid++;
spin_unlock(&srv->srv_lock);
- res->xid = HTON__u32(bulk->b_xid);
-
- bulk->b_buf = (void *)(unsigned long)res->addr;
- bulk->b_cb = ost_brw_write_cb;
- bulk->b_page = res->page;
- memcpy(&(bulk->b_conn), &conn, sizeof(conn));
+ bulk->b_buf = (void *)(unsigned long)lnb->addr;
+ bulk->b_page = lnb->page;
bulk->b_buflen = PAGE_SIZE;
- bulk->b_portal = OSC_BULK_PORTAL;
- rc = ptlrpc_register_bulk(bulk);
- if (rc)
- GOTO(out, rc);
+ bulk->b_cb = ost_brw_write_cb;
+
+ /* this advances remote_nb */
+ ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+ bulk->b_xid);
}
+ rc = ptlrpc_register_bulk(desc);
+ if (rc)
+ GOTO(fail_bulk, rc);
+
EXIT;
- out:
- /* FIXME: should we return 'rc' here? */
+ success:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
return 0;
+
+ fail_bulk:
+ ptlrpc_free_bulk(desc);
+ fail_preprw:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+ /* FIXME: how do we undo the preprw? */
+ fail:
+ return rc;
}
static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
if (err)
GOTO(error_disc, err = -EINVAL);
-#if 1
err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
if (err)
GOTO(error_disc, err = -EINVAL);
-#endif
RETURN(0);
if (bulk != NULL) {
bulk->b_connection = ptlrpc_connection_addref(conn);
init_waitqueue_head(&bulk->b_waitq);
+ INIT_LIST_HEAD(&bulk->b_page_list);
}
return bulk;
}
+struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
+{
+ struct ptlrpc_bulk_page *page;
+
+ OBD_ALLOC(page, sizeof(*page));
+ if (page != NULL) {
+ page->b_desc = desc;
+ ptl_set_inv_handle(&page->b_md_h);
+ ptl_set_inv_handle(&page->b_me_h);
+ list_add(&page->b_link, &desc->b_page_list);
+ desc->b_page_count++;
+ }
+ return page;
+}
+
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk)
{
+ struct list_head *tmp, *next;
ENTRY;
if (bulk == NULL) {
EXIT;
return;
}
+ list_for_each_safe(tmp, next, &bulk->b_page_list) {
+ struct ptlrpc_bulk_page *page;
+ page = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+ ptlrpc_free_bulk_page(page);
+ }
+
ptlrpc_put_connection(bulk->b_connection);
OBD_FREE(bulk, sizeof(*bulk));
EXIT;
}
+void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page)
+{
+ ENTRY;
+ if (page == NULL) {
+ EXIT;
+ return;
+ }
+
+ list_del(&page->b_link);
+ page->b_desc->b_page_count--;
+ OBD_FREE(page, sizeof(*page));
+ EXIT;
+}
+
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
struct ptlrpc_connection *conn,
int opcode, int count, int *lengths,
static int bulk_source_callback(ptl_event_t *ev, void *data)
{
- struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_desc *desc = bulk->b_desc;
ENTRY;
if (ev->type == PTL_EVENT_SENT) {
CDEBUG(D_NET, "got SENT event\n");
} else if (ev->type == PTL_EVENT_ACK) {
CDEBUG(D_NET, "got ACK event\n");
- bulk->b_flags |= PTL_BULK_FL_SENT;
- wake_up_interruptible(&bulk->b_waitq);
+ desc->b_flags |= PTL_BULK_FL_SENT;
+ wake_up_interruptible(&desc->b_waitq);
} else {
CERROR("Unexpected event type!\n");
LBUG();
static int bulk_sink_callback(ptl_event_t *ev, void *data)
{
- struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_desc *desc = bulk->b_desc;
ENTRY;
if (ev->type == PTL_EVENT_PUT) {
if (bulk->b_buf != ev->mem_desc.start + ev->offset)
CERROR("bulkbuf != mem_desc -- why?\n");
- bulk->b_flags |= PTL_BULK_FL_RCVD;
+ desc->b_finished_count++;
+ if (desc->b_finished_count == desc->b_page_count) {
+ desc->b_flags |= PTL_BULK_FL_RCVD;
+ wake_up_interruptible(&desc->b_waitq);
+ if (desc->b_cb != NULL)
+ desc->b_cb(desc);
+ }
if (bulk->b_cb != NULL)
- bulk->b_cb(bulk, data);
- wake_up_interruptible(&bulk->b_waitq);
+ bulk->b_cb(bulk);
} else {
CERROR("Unexpected event type!\n");
LBUG();
}
- /* FIXME: This should happen unconditionally */
- if (bulk->b_cb != NULL)
- ptlrpc_free_bulk(bulk);
-
RETURN(1);
}
*/
#define EXPORT_SYMTAB
-
#define DEBUG_SUBSYSTEM S_RPC
#include <linux/lustre_net.h>
-extern ptl_handle_eq_t request_out_eq,
- reply_in_eq,
- reply_out_eq,
- bulk_source_eq,
- bulk_sink_eq;
+extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
+ bulk_source_eq, bulk_sink_eq;
static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY};
-
-int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+static int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
{
ENTRY;
return rc;
}
-int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
{
int rc;
+ struct list_head *tmp, *next;
ptl_process_id_t remote_id;
+ ENTRY;
- bulk->b_md.start = bulk->b_buf;
- bulk->b_md.length = bulk->b_buflen;
- bulk->b_md.eventq = bulk_source_eq;
- bulk->b_md.threshold = 2; /* SENT and ACK events */
- bulk->b_md.options = PTL_MD_OP_PUT;
- bulk->b_md.user_ptr = bulk;
-
- rc = PtlMDBind(bulk->b_connection->c_peer.peer_ni, bulk->b_md,
- &bulk->b_md_h);
- if (rc != 0) {
- CERROR("PtlMDBind failed: %d\n", rc);
- LBUG();
- return rc;
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ /* only request an ACK for the last page */
+ int ack = (next == &desc->b_page_list);
+ struct ptlrpc_bulk_page *bulk;
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.eventq = bulk_source_eq;
+ bulk->b_md.threshold = 1 + ack; /* SENT and (if last) ACK */
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+
+ rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md,
+ &bulk->b_md_h);
+ if (rc != 0) {
+ CERROR("PtlMDBind failed: %d\n", rc);
+ LBUG();
+ RETURN(rc);
+ }
+
+ remote_id.nid = desc->b_connection->c_peer.peer_nid;
+ remote_id.pid = 0;
+
+ CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
+ bulk->b_md.length, desc->b_portal, bulk->b_xid);
+
+ rc = PtlPut(bulk->b_md_h, (ack ? PTL_ACK_REQ : PTL_NOACK_REQ),
+ remote_id, desc->b_portal, 0, bulk->b_xid, 0, 0);
+ if (rc != PTL_OK) {
+ CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+ desc->b_portal, bulk->b_xid, rc);
+ PtlMDUnlink(bulk->b_md_h);
+ LBUG();
+ RETURN(rc);
+ }
}
- remote_id.nid = bulk->b_connection->c_peer.peer_nid;
- remote_id.pid = 0;
+ wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
- CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
- bulk->b_md.length, portal, bulk->b_xid);
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ RETURN(-EINTR);
- rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id, portal, 0,
- bulk->b_xid, 0, 0);
- if (rc != PTL_OK) {
- CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
- portal, bulk->b_xid, rc);
- PtlMDUnlink(bulk->b_md_h);
- LBUG();
- }
-
- return rc;
+ RETURN(0);
}
-int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc)
{
+ struct list_head *tmp, *next;
int rc;
ENTRY;
- rc = PtlMEAttach(bulk->b_connection->c_peer.peer_ni, bulk->b_portal,
- local_id, bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
- &bulk->b_me_h);
- if (rc != PTL_OK) {
- CERROR("PtlMEAttach failed: %d\n", rc);
- LBUG();
- GOTO(cleanup, rc);
- }
-
- bulk->b_md.start = bulk->b_buf;
- bulk->b_md.length = bulk->b_buflen;
- bulk->b_md.threshold = 1;
- bulk->b_md.options = PTL_MD_OP_PUT;
- bulk->b_md.user_ptr = bulk;
- bulk->b_md.eventq = bulk_sink_eq;
-
- rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
- //CERROR("MDAttach (bulk sink): %Lu\n", (__u64)bulk->b_md_h);
- if (rc != PTL_OK) {
- CERROR("PtlMDAttach failed: %d\n", rc);
- LBUG();
- GOTO(cleanup, rc);
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+ rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni,
+ desc->b_portal, local_id, bulk->b_xid, 0,
+ PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ LBUG();
+ GOTO(cleanup, rc);
+ }
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.threshold = 1;
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+ bulk->b_md.eventq = bulk_sink_eq;
+
+ rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK,
+ &bulk->b_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ LBUG();
+ GOTO(cleanup, rc);
+ }
+
+ CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, "
+ "portal %u\n", bulk->b_buflen, bulk->b_xid,
+ desc->b_portal);
}
- CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
- bulk->b_buflen, bulk->b_xid, bulk->b_portal);
RETURN(0);
- // XXX Confirm that this is safe!
cleanup:
- PtlMEUnlink(bulk->b_me_h);
+ ptlrpc_abort_bulk(desc);
+
return rc;
}
-int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
{
- int rc;
+ struct list_head *tmp, *next;
- rc = PtlMEUnlink(bulk->b_me_h);
- if (rc != PTL_OK)
- CERROR("PtlMEUnlink failed: %d\n", rc);
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
- return rc;
+ /* This should be safe: these handles are initialized to be
+ * invalid in ptlrpc_prep_bulk_page() */
+ PtlMDUnlink(bulk->b_md_h);
+ PtlMEUnlink(bulk->b_me_h);
+ }
+
+ return 0;
}
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
echo -n "Hit return to continue..."
read
-new_fs ext2 /tmp/ost 10000
+new_fs ext2 /tmp/ost 10001
OST=$LOOPDEV
MDSFS=ext3
-new_fs ${MDSFS} /tmp/mds 10000
+new_fs ${MDSFS} /tmp/mds 10001
MDS=$LOOPDEV
echo 0xffffffff > /proc/sys/portals/debug