hours, we hope, but update at your own (extreme) risk.
- Adds more unfinished LOV functionality
- Changes MDS EAs to contain LOV data
- Adds new brw callbacks
- Fixes to work with the Portals tip
__u32 buflens[0];
};
-struct niobuf_remote {
- __u64 offset;
- __u32 len;
- __u32 xid;
- __u32 flags;
-};
-
-struct niobuf_local {
- __u64 offset;
- __u32 len;
- __u32 xid;
- __u32 flags;
- void *addr;
- struct page *page;
- void *target_private;
- struct dentry *dentry;
-};
-
#define N_LOCAL_TEMP_PAGE 0x00000001
/*
typedef uint32_t obd_flag;
typedef uint32_t obd_count;
-#define OBD_FL_INLINEDATA (0x00000001UL)
-#define OBD_FL_OBDMDEXISTS (0x00000002UL)
+#define OBD_FL_INLINEDATA (0x00000001)
+#define OBD_FL_OBDMDEXISTS (0x00000002)
#define OBD_INLINESZ 60
#define OBD_OBDMDSZ 60
#define OBD_MD_FLNOTOBD (~(OBD_MD_FLOBDMD | OBD_MD_FLOBDFLG | OBD_MD_FLBLOCKS |\
OBD_MD_LINKNAME))
+struct obd_ioobj {
+ obd_id ioo_id;
+ obd_gr ioo_gr;
+ __u32 ioo_type;
+ __u32 ioo_bufcnt;
+};
+
+struct niobuf_remote {
+ __u64 offset;
+ __u32 len;
+ __u32 xid;
+ __u32 flags;
+};
+
+struct niobuf_local {
+ __u64 offset;
+ __u32 len;
+ __u32 xid;
+ __u32 flags;
+ void *addr;
+ struct page *page;
+ void *target_private;
+ struct dentry *dentry;
+};
+
/* request structure for OST's */
#define OST_REQ_HAS_OA1 0x1
struct obdo oa;
};
-struct obd_ioobj {
- obd_id ioo_id;
- obd_gr ioo_gr;
- __u32 ioo_type;
- __u32 ioo_bufcnt;
-};
-
/*
* MDS REQ RECORDS
*/
struct mds_body {
struct ll_fid fid1;
struct ll_fid fid2;
- __u64 objid;
__u64 size;
+ __u64 extra;
__u32 valid;
__u32 mode;
__u32 uid;
__u32 cr_gid;
__u64 cr_time;
__u32 cr_mode;
- /* overloaded: id for create, tgtlen for symlink, rdev for mknod */
- __u64 cr_id;
+ __u64 cr_rdev;
};
struct mds_rec_link {
__u32 fd_flags;
};
+struct ll_inode_md {
+ struct mds_body *body;
+ struct obdo *obdo;
+};
+
#define LL_IOC_GETFLAGS _IOR ('f', 151, long)
#define LL_IOC_SETFLAGS _IOW ('f', 152, long)
#define LL_IOC_CLRFLAGS _IOW ('f', 153, long)
#define LL_INLINESZ 60
struct ll_inode_info {
int lli_flags;
- __u64 lli_objid;
+ struct obdo *lli_obdo;
+ char *lli_symlink_name;
char lli_inline[LL_INLINESZ];
};
int ur_tgtlen;
char *ur_tgt;
struct iattr ur_iattr;
- __u64 ur_id;
+ __u64 ur_rdev;
__u32 ur_mode;
__u32 ur_uid;
__u32 ur_gid;
int type, __u64 offset, char *addr, struct ptlrpc_request **);
int mdc_create(struct ptlrpc_client *, struct ptlrpc_connection *,
struct inode *dir, const char *name, int namelen,
- const char *tgt, int tgtlen,
- int mode, __u64 id, __u32 uid, __u32 gid, __u64 time,
+ const char *tgt, int tgtlen, int mode, __u32 uid, __u32 gid,
+ __u64 time, __u64 rdev, struct obdo *obdo,
struct ptlrpc_request **);
int mdc_unlink(struct ptlrpc_client *, struct ptlrpc_connection *,
struct inode *dir, struct inode *child, const char *name,
int (* fs_commit)(struct inode *inode, void *handle);
int (* fs_setattr)(struct dentry *dentry, void *handle,
struct iattr *iattr);
- int (* fs_set_objid)(struct inode *inode, void *handle, obd_id id);
- int (* fs_get_objid)(struct inode *inode, obd_id *id);
+ int (* fs_set_obdo)(struct inode *inode, void *handle,
+ struct obdo *obdo);
+ int (* fs_get_obdo)(struct inode *inode, struct obdo *obdo);
ssize_t (* fs_readpage)(struct file *file, char *buf, size_t count,
loff_t *offset);
void (* fs_delete_inode)(struct inode *inode);
return mds->mds_fsops->fs_setattr(dentry, handle, iattr);
}
-static inline int mds_fs_set_objid(struct mds_obd *mds, struct inode *inode,
- void *handle, __u64 id)
+static inline int mds_fs_set_obdo(struct mds_obd *mds, struct inode *inode,
+ void *handle, struct obdo *obdo)
{
- return mds->mds_fsops->fs_set_objid(inode, handle, id);
+ return mds->mds_fsops->fs_set_obdo(inode, handle, obdo);
}
-static inline int mds_fs_get_objid(struct mds_obd *mds, struct inode *inode,
- __u64 *id)
+static inline int mds_fs_get_obdo(struct mds_obd *mds, struct inode *inode,
+ struct obdo *obdo)
{
- return mds->mds_fsops->fs_get_objid(inode, id);
+ return mds->mds_fsops->fs_get_obdo(inode, obdo);
}
static inline ssize_t mds_fs_readpage(struct mds_obd *mds, struct file *file,
struct ptlrpc_connection *b_connection;
struct ptlrpc_client *b_client;
__u32 b_portal;
- int (*b_cb)(struct ptlrpc_bulk_desc *);
struct obd_conn b_conn;
+ void (*b_cb)(struct ptlrpc_bulk_desc *, void *);
+ void *b_cb_data;
wait_queue_head_t b_waitq;
struct list_head b_page_list;
__u32 b_page_count;
- __u32 b_finished_count;
+ atomic_t b_finished_count;
void *b_desc_private;
};
struct ptlrpc_request *req);
};
+typedef void (*bulk_callback_t)(struct ptlrpc_bulk_desc *, void *);
+
typedef int (*svc_handler_t)(struct obd_device *obddev,
struct ptlrpc_service *svc,
struct ptlrpc_request *req);
void ptlrpc_cleanup_connection(void);
/* rpc/niobuf.c */
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk);
int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *);
int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
struct ptlrpc_connection *osc_conn;
};
+typedef __u8 uuid_t[37];
+
+#define MAX_MULTI 16
struct lov_obd {
+ __u32 lov_default_count;
+ __u32 lov_default_pattern;
+ __u32 lov_default_size;
+ uuid_t lov_service_uuids[MAX_MULTI];
+
+#if 0
int lov_count;
struct obd_conn *lov_targets;
+#endif
};
/* corresponds to one of the obd's */
-#define MAX_MULTI 16
struct obd_device {
struct obd_type *obd_type;
char *obd_name;
obd_size *count, obd_off offset);
int (*o_brw)(int rw, struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags);
+ obd_size *count, obd_off *offset, obd_flag *flags,
+ void *);
int (*o_punch)(struct obd_conn *conn, struct obdo *tgt, obd_size count,
obd_off offset);
int (*o_sync)(struct obd_conn *conn, struct obdo *tgt, obd_size count,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *local,
void *desc_private);
+
int (*o_enqueue)(struct obd_conn *conn, struct ldlm_namespace *ns,
struct ldlm_handle *parent_lock, __u64 *res_id,
__u32 type, struct ldlm_extent *, __u32 mode,
static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs,
struct page **buf, obd_size *count, obd_off *offset,
- obd_flag *flags)
+ obd_flag *flags, void *callback)
{
int rc;
OBD_CHECK_SETUP(conn);
OBD_CHECK_OP(conn,brw);
rc = OBP(conn->oc_dev, brw)(rw, conn, num_oa, oa, oa_bufs, buf,
- count, offset, flags);
+ count, offset, flags, callback);
RETURN(rc);
}
struct obdo *oa = NULL;
oa = kmem_cache_alloc(obdo_cachep, SLAB_KERNEL);
+ if (oa == NULL)
+ LBUG();
memset(oa, 0, sizeof (*oa));
return oa;
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef _OBD_LOV_H__
+#define _OBD_LOV_H__
+
+#ifdef __KERNEL__
+
+#define OBD_LOV_DEVICENAME "lov"
+
+struct lov_object_id { /* per-child structure */
+ __u64 l_object_id;
+ __u32 l_device_id;
+};
+
+struct lov_md {
+ __u64 lmd_object_id; /* lov object id */
+ __u64 lmd_stripe_count;
+ __u32 lmd_stripe_size;
+ __u32 lmd_stripe_pattern; /* per-lov object stripe pattern */
+ struct lov_object_id lmd_objects[0];
+};
+
+#endif
+#endif
mds_pack_fid(&b->fid1);
mds_pack_fid(&b->fid2);
- b->objid = HTON__u64(b->objid);
+ b->extra = HTON__u64(b->extra);
b->size = HTON__u64(b->size);
b->valid = HTON__u32(b->valid);
b->mode = HTON__u32(b->mode);
/* packing of MDS records */
void mds_create_pack(struct mds_rec_create *rec, struct inode *inode,
- __u32 mode, __u64 id, __u32 uid, __u32 gid, __u64 time)
+ __u32 mode, __u64 rdev, __u32 uid, __u32 gid, __u64 time)
{
/* XXX do something about time, uid, gid */
rec->cr_opcode = HTON__u32(REINT_CREATE);
ll_inode2fid(&rec->cr_fid, inode);
rec->cr_mode = HTON__u32(mode);
- rec->cr_id = HTON__u64(id);
+ rec->cr_rdev = HTON__u64(rdev);
rec->cr_uid = HTON__u32(uid);
rec->cr_gid = HTON__u32(gid);
rec->cr_time = HTON__u64(time);
mds_unpack_fid(&b->fid1);
mds_unpack_fid(&b->fid2);
- b->objid = NTOH__u64(b->objid);
+ b->extra = NTOH__u64(b->extra);
b->size = NTOH__u64(b->size);
b->valid = NTOH__u32(b->valid);
b->mode = NTOH__u32(b->mode);
struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, 0);
ENTRY;
- if (req->rq_reqmsg->bufcount != 3 ||
+ if (req->rq_reqmsg->bufcount < 2 ||
req->rq_reqmsg->buflens[0] != sizeof(*rec))
RETURN(-EFAULT);
r->ur_fid1 = &rec->cr_fid;
r->ur_mode = NTOH__u32(rec->cr_mode);
- r->ur_id = NTOH__u64(rec->cr_id);
+ r->ur_rdev = NTOH__u64(rec->cr_rdev);
r->ur_uid = NTOH__u32(rec->cr_uid);
r->ur_gid = NTOH__u32(rec->cr_gid);
r->ur_time = NTOH__u64(rec->cr_time);
r->ur_name = lustre_msg_buf(req->rq_reqmsg, 1);
r->ur_namelen = req->rq_reqmsg->buflens[1];
- r->ur_tgt = lustre_msg_buf(req->rq_reqmsg, 2);
- r->ur_tgtlen = req->rq_reqmsg->buflens[2];
+ if (S_ISLNK(r->ur_mode)) {
+ r->ur_tgt = lustre_msg_buf(req->rq_reqmsg, 2);
+ r->ur_tgtlen = req->rq_reqmsg->buflens[2];
+ }
RETURN(0);
}
int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
extern int ll_setattr(struct dentry *de, struct iattr *attr);
-extern inline struct obdo * ll_oa_from_inode(struct inode *inode,
- unsigned long valid);
static int ll_file_open(struct inode *inode, struct file *file)
{
CERROR("mdc_open didn't assign fd_mdshandle\n");
/* XXX handle this how, abort or is it non-fatal? */
}
+ if (!fd->fd_mdshandle)
+ CERROR("mdc_open didn't assign fd_mdshandle\n");
- oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
+ oa = ll_i2info(inode)->lli_obdo;
if (oa == NULL) {
LBUG();
GOTO(out_mdc, rc = -ENOMEM);
}
rc = obd_open(ll_i2obdconn(inode), oa);
- obdo_free(oa);
if (rc) {
GOTO(out_mdc, rc = -abs(rc));
}
GOTO(out, rc = -EINVAL);
}
- oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
+ oa = ll_i2info(inode)->lli_obdo;
if (oa == NULL) {
LBUG();
GOTO(out_fd, rc = -ENOENT);
}
rc = obd_close(ll_i2obdconn(inode), oa);
- obdo_free(oa);
- if (rc) {
+ if (rc)
GOTO(out_fd, abs(rc));
- }
if (file->f_mode & FMODE_WRITE) {
struct iattr attr;
struct ptlrpc_request *request = NULL;
struct inode * inode = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
- int err;
- int type;
+ struct ll_inode_md md;
+ int err, type;
ino_t ino;
-
+
ENTRY;
if (dentry->d_name.len > EXT2_NAME_LEN)
RETURN(ERR_PTR(-ENAMETOOLONG));
RETURN(ERR_PTR(-abs(err)));
}
- inode = iget4(dir->i_sb, ino, ll_find_inode,
- lustre_msg_buf(request->rq_repmsg, 0));
+ if (S_ISREG(type)) {
+ if (request->rq_repmsg->bufcount < 2 ||
+ request->rq_repmsg->buflens[1] != sizeof(struct obdo))
+ LBUG();
+
+ md.obdo = lustre_msg_buf(request->rq_repmsg, 1);
+ } else
+ md.obdo = NULL;
+
+ md.body = lustre_msg_buf(request->rq_repmsg, 0);
+
+ inode = iget4(dir->i_sb, ino, ll_find_inode, &md);
ptlrpc_free_req(request);
if (!inode)
static struct inode *ll_create_node(struct inode *dir, const char *name,
int namelen, const char *tgt, int tgtlen,
- int mode, __u64 id)
+ int mode, __u64 extra, struct obdo *obdo)
{
struct inode *inode;
struct ptlrpc_request *request = NULL;
int err;
time_t time = CURRENT_TIME;
struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_inode_md md;
ENTRY;
err = mdc_create(&sbi->ll_mds_client, sbi->ll_mds_conn, dir, name,
- namelen, tgt, tgtlen, mode, id, current->fsuid,
- current->fsgid, time, &request);
+ namelen, tgt, tgtlen, mode, current->fsuid,
+ current->fsgid, time, extra, obdo, &request);
if (err) {
inode = ERR_PTR(err);
GOTO(out, err);
body = lustre_msg_buf(request->rq_repmsg, 0);
body->valid = (__u32)OBD_MD_FLNOTOBD;
- body->objid = id;
body->nlink = 1;
body->atime = body->ctime = body->mtime = time;
body->uid = current->fsuid;
body->gid = current->fsgid;
body->mode = mode;
- CDEBUG(D_INODE, "-- new_inode: objid %lld, ino %d, mode %o\n",
- (unsigned long long)body->objid, body->ino, body->mode);
- inode = iget4(dir->i_sb, body->ino, ll_find_inode, body);
+ md.body = body;
+ md.obdo = obdo;
+
+ inode = iget4(dir->i_sb, body->ino, ll_find_inode, &md);
if (IS_ERR(inode)) {
CERROR("new_inode -fatal: %ld\n", PTR_ERR(inode));
inode = ERR_PTR(-EIO);
{
int err, rc;
struct obdo oa;
- struct inode * inode;
+ struct inode *inode;
memset(&oa, 0, sizeof(oa));
oa.o_valid = OBD_MD_FLMODE;
mode = mode | S_IFREG;
CDEBUG(D_DENTRY, "name %s mode %o o_id %lld\n",
dentry->d_name.name, mode, (unsigned long long)oa.o_id);
- inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
- NULL, 0, mode, oa.o_id);
+ inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
+ NULL, 0, mode, 0, &oa);
if (IS_ERR(inode)) {
rc = PTR_ERR(inode);
} /* ll_create */
-static int ll_mknod (struct inode * dir, struct dentry *dentry, int mode, int rdev)
+static int ll_mknod (struct inode * dir, struct dentry *dentry, int mode,
+ int rdev)
{
struct inode * inode = ll_create_node(dir, dentry->d_name.name,
dentry->d_name.len, NULL, 0,
- mode, 0);
+ mode, rdev, NULL);
int err = PTR_ERR(inode);
if (!IS_ERR(inode)) {
init_special_inode(inode, mode, rdev);
inode = ll_create_node(dir, dentry->d_name.name,
dentry->d_name.len, symname, l,
- S_IFLNK | S_IRWXUGO, 0);
+ S_IFLNK | S_IRWXUGO, 0, NULL);
err = PTR_ERR(inode);
if (IS_ERR(inode))
return err;
inode = ll_create_node (dir, dentry->d_name.name,
dentry->d_name.len, NULL, 0,
- S_IFDIR | mode, 0);
+ S_IFDIR | mode, 0, NULL);
err = PTR_ERR(inode);
if (IS_ERR(inode))
goto out_dir;
#include <linux/lustre_lite.h>
#include <linux/lustre_lib.h>
-inline struct obdo * ll_oa_from_inode(struct inode *inode, unsigned long valid)
-{
- struct ll_inode_info *oinfo = ll_i2info(inode);
- struct obdo *oa = obdo_alloc();
- if ( !oa ) {
- CERROR("no memory to allocate obdo!\n");
- return NULL;
- }
- oa->o_valid = valid;
-
- if ( valid & OBD_MD_FLID )
- oa->o_id = oinfo->lli_objid;
- if ( valid & OBD_MD_FLATIME )
- oa->o_atime = inode->i_atime;
- if ( valid & OBD_MD_FLMTIME )
- oa->o_mtime = inode->i_mtime;
- if ( valid & OBD_MD_FLCTIME )
- oa->o_ctime = inode->i_ctime;
- if ( valid & OBD_MD_FLSIZE )
- oa->o_size = inode->i_size;
- if ( valid & OBD_MD_FLBLOCKS ) /* allocation of space */
- oa->o_blocks = inode->i_blocks;
- if ( valid & OBD_MD_FLBLKSZ )
- oa->o_blksize = inode->i_blksize;
- if ( valid & OBD_MD_FLMODE )
- oa->o_mode = inode->i_mode;
- if ( valid & OBD_MD_FLUID )
- oa->o_uid = inode->i_uid;
- if ( valid & OBD_MD_FLGID )
- oa->o_gid = inode->i_gid;
- if ( valid & OBD_MD_FLFLAGS )
- oa->o_flags = inode->i_flags;
- if ( valid & OBD_MD_FLNLINK )
- oa->o_nlink = inode->i_nlink;
- if ( valid & OBD_MD_FLGENER )
- oa->o_generation = inode->i_generation;
-
- CDEBUG(D_INFO, "src inode %ld, dst obdo %ld valid 0x%08lx\n",
- inode->i_ino, (long)oa->o_id, valid);
-#if 0
- /* this will transfer metadata for the logical object to
- the oa: that metadata could contain the constituent objects
- */
- if (ll_has_inline(inode)) {
- CDEBUG(D_INODE, "copying inline data from inode to obdo\n");
- memcpy(oa->o_inline, oinfo->lli_inline, OBD_INLINESZ);
- oa->o_obdflags |= OBD_FL_INLINEDATA;
- oa->o_valid |= OBD_MD_FLINLINE;
- }
-#endif
- return oa;
-} /* ll_oa_from_inode */
-
/* SYNCHRONOUS I/O to object storage for an inode */
static int ll_brw(int rw, struct inode *inode, struct page *page, int create)
{
int err;
ENTRY;
- oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
- if (!oa)
- RETURN(-ENOMEM);
-
+ oa = ll_i2info(inode)->lli_obdo;
err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
- &page, &count, &offset, &flags);
-
- obdo_free(oa);
+ &page, &count, &offset, &flags, NULL);
RETURN(err);
} /* ll_brw */
struct iattr iattr;
ENTRY;
- oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
- if (! oa )
- RETURN(-ENOMEM);
+ oa = ll_i2info(inode)->lli_obdo;
SetPageUptodate(page);
from, to, (unsigned long long)count);
err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), num_obdo, &oa,
- &bufs_per_obdo, &page, &count, &offset, &flags);
+ &bufs_per_obdo, &page, &count, &offset, &flags, NULL);
kunmap(page);
if ((iattr.ia_size = offset + to) > inode->i_size) {
#endif
}
- obdo_free(oa);
RETURN(err);
} /* ll_commit_write */
int err;
ENTRY;
- oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
- if ( !oa ) {
- CERROR("no memory to allocate obdo!\n");
- return;
- }
+ oa = ll_i2info(inode)->lli_obdo;
CDEBUG(D_INFO, "calling punch for %ld (%Lu bytes at 0)\n",
(long)oa->o_id, (unsigned long long)oa->o_size);
err = obd_punch(ll_i2obdconn(inode), oa, oa->o_size, 0);
- obdo_free(oa);
if (err) {
CERROR("obd_truncate fails (%d)\n", err);
flags[i] = OBD_BRW_CREATE;
}
- oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
+ oa = ll_i2info(inode)->lli_obdo;
if (!oa)
GOTO(out, rc = -ENOMEM);
-
rc = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
- iobuf->maplist, count, offset, flags);
- if (rc == 0)
+ iobuf->maplist, count, offset, flags, NULL);
+ if (rc == 0)
rc = bufs_per_obdo * PAGE_SIZE;
out:
- obdo_free(oa);
- OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
- OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
- OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
+ if (flags)
+ OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
+ if (count)
+ OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
+ if (offset)
+ OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
RETURN(rc);
}
__u64 last_committed, last_rcvd;
__u32 last_xid;
struct ptlrpc_request *request = NULL;
+ struct ll_inode_md md;
ENTRY;
MOD_INC_USE_COUNT;
GOTO(out_mdc, sb = NULL);
}
- root = iget4(sb, sbi->ll_rootino, NULL,
- lustre_msg_buf(request->rq_repmsg, 0));
+ md.body = lustre_msg_buf(request->rq_repmsg, 0);
+ md.obdo = NULL;
+ root = iget4(sb, sbi->ll_rootino, NULL, &md);
+
if (root) {
sb->s_root = d_alloc_root(root);
} else {
EXIT;
} /* ll_put_super */
-
-extern inline struct obdo * ll_oa_from_inode(struct inode *inode, int valid);
+static void ll_clear_inode(struct inode *inode)
+{
+ if (atomic_read(&inode->i_count) == 0) {
+ struct obdo *oa = ll_i2info(inode)->lli_obdo;
+ if (oa) {
+ obdo_free(oa);
+ ll_i2info(inode)->lli_obdo = NULL;
+ }
+ if (ll_i2info(inode)->lli_symlink_name) {
+ OBD_FREE(ll_i2info(inode)->lli_symlink_name,
+ strlen(ll_i2info(inode)->lli_symlink_name)+ 1);
+ ll_i2info(inode)->lli_symlink_name = NULL;
+ }
+ }
+}
static void ll_delete_inode(struct inode *inode)
{
- if (S_ISREG(inode->i_mode)) {
- int err;
- struct obdo *oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
+ if (S_ISREG(inode->i_mode)) {
+ int err;
+ struct obdo *oa;
+ oa = ll_i2info(inode)->lli_obdo;
if (!oa) {
CERROR("no memory\n");
RETURN(err);
}
-static void inline ll_to_inode(struct inode *dst, struct mds_body *body)
+static void inline ll_to_inode(struct inode *dst, struct ll_inode_md *md)
{
+ struct mds_body *body = md->body;
struct ll_inode_info *ii = ll_i2info(dst);
/* core attributes first */
dst->i_generation = body->generation;
/* this will become more elaborate for striping etc */
- if (body->valid & OBD_MD_FLOBJID)
- ii->lli_objid = body->objid;
+ if (md->obdo != NULL) {
+ ii->lli_obdo = obdo_alloc();
+ memcpy(ii->lli_obdo, md->obdo, sizeof(*md->obdo));
+ }
#if 0
if (obdo_has_inline(oa)) {
static inline void ll_read_inode2(struct inode *inode, void *opaque)
{
- struct mds_body *body = opaque;
+ struct ll_inode_md *md = opaque;
ENTRY;
- ll_to_inode(inode, body);
+ ll_to_inode(inode, md);
/* OIDEBUG(inode); */
struct super_operations ll_super_operations =
{
read_inode2: ll_read_inode2,
+ clear_inode: ll_clear_inode,
delete_inode: ll_delete_inode,
put_super: ll_put_super,
statfs: ll_statfs
--- /dev/null
+Makefile
+Makefile.in
DEFS :=
-#MODULE = lov
-#modulefs_DATA = lov.o
-#EXTRA_PROGRAMS = lov
+MODULE = lov
+modulefs_DATA = lov.o
+EXTRA_PROGRAMS = lov
-#lov_SOURCES = lov_obd.c
+lov_SOURCES = lov_obd.c
include $(top_srcdir)/Rules
#include <linux/module.h>
#include <linux/obd_class.h>
+#include <linux/obd_lov.h>
extern struct obd_device obd_dev[MAX_OBD_DEVICES];
static int lov_create(struct obd_conn *conn, struct obdo *oa)
{
- int rc, i;
+ int rc, retval, i, offset;
+ struct obdo tmp;
+ struct lov_md md;
ENTRY;
if (!gen_client(conn))
RETURN(-EINVAL);
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++)
- rc = obd_create(&conn->oc_dev->obd_multi_conn[i], oa);
+ md.lmd_object_id = oa->o_id;
+ md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
+
+ memset(oa->o_inline, 0, sizeof(oa->o_inline));
+ offset = sizeof(md);
+ for (i = 0; i < md.lmd_stripe_count; i++) {
+ struct lov_object_id lov_id;
+ rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
+ if (i == 0) {
+ memcpy(oa, &tmp, sizeof(tmp));
+ retval = rc;
+ } else if (retval != rc)
+ CERROR("return codes didn't match (%d, %d)\n",
+ retval, rc);
+ lov_id = (struct lov_object_id *)(oa->o_inline + offset);
+ lov_id->l_device_id = i;
+ lov_id->l_object_id = tmp.o_id;
+ offset += sizeof(*lov_id);
+ }
+ memcpy(oa->o_inline, &md, sizeof(md));
return rc;
}
-static int filter_destroy(struct obd_conn *conn, struct obdo *oa)
+static int lov_destroy(struct obd_conn *conn, struct obdo *oa)
{
-#if 0
- struct obd_device * obddev;
- struct obd_client * cli;
- struct inode * inode;
- struct file *dir;
- struct file *object;
- int rc;
- struct obd_run_ctxt saved;
+ int rc, retval, i, offset;
+ struct obdo tmp;
+ struct lov_md *md;
+ struct lov_object_id lov_id;
+ ENTRY;
- if (!(cli = gen_client(conn))) {
- CERROR("invalid client %u\n", conn->oc_id);
- EXIT;
- return -EINVAL;
- }
+ if (!gen_client(conn))
+ RETURN(-EINVAL);
- obddev = conn->oc_dev;
- object = filter_obj_open(obddev, oa->o_id, oa->o_mode);
- if (!object || IS_ERR(object)) {
- EXIT;
- return -ENOENT;
- }
+ md = (struct lov_md *)oa->o_inline;
- inode = object->f_dentry->d_inode;
- inode->i_nlink = 1;
- inode->i_mode = 010000;
+ memcpy(&tmp, oa, sizeof(tmp));
- push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
- dir = filter_parent(oa->o_id, oa->o_mode);
- if (IS_ERR(dir)) {
- rc = PTR_ERR(dir);
- EXIT;
- goto out;
+ offset = sizeof(md);
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ struct lov_object_id *lov_id;
+ lov_id = (struct lov_object_id *)(oa->o_inline + offset);
+
+ tmp.o_id = lov_id->l_object_id;
+
+ rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
+ if (i == 0)
+ retval = rc;
+ else if (retval != rc)
+ CERROR("return codes didn't match (%d, %d)\n",
+ retval, rc);
+ offset += sizeof(*lov_id);
}
- dget(dir->f_dentry);
- dget(object->f_dentry);
- rc = vfs_unlink(dir->f_dentry->d_inode, object->f_dentry);
-
- filp_close(dir, 0);
- filp_close(object, 0);
-out:
- pop_ctxt(&saved);
- EXIT;
+
return rc;
-#endif
- return 0;
}
/* FIXME: maybe we'll just make one node the authoritative attribute node, then
RETURN(rc);
}
+struct lov_callback_data {
+ atomic_t count;
+ wait_queue_head waitq;
+};
+
+static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
+{
+ struct lov_callback_data *cb_data = data;
+
+ if (atomic_dec_and_test(&cb_data->count))
+ wake_up(&cb_data->waitq);
+}
+
+static int lov_read_check_status(struct lov_callback_data *cb_data)
+{
+ ENTRY;
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGTERM) ||
+ sigismember(&(current->pending.signal), SIGINT)) {
+ cb_data->flags |= PTL_RPC_FL_INTR;
+ RETURN(1);
+ }
+ if (atomic_read(&cb_data->count) == 0)
+ RETURN(1);
+ RETURN(0);
+}
+
/* buffer must lie in user memory here */
-static int lov_read(struct obd_conn *conn, struct obdo *oa, char *buf,
- obd_size *count, obd_off offset)
+static int lov_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
+ struct obdo **oa,
+ obd_count *oa_bufs, struct page **buf,
+ obd_size *count, obd_off *offset, obd_flag *flags,
+ bulk_callback_t callback, void *data)
{
- int rc, i;
+ int rc, i, page_array_offset = 0;
obd_off off = offset;
obd_size retval = 0;
+ struct lov_callback_data *cb_data;
ENTRY;
+ if (num_oa != 1)
+ LBUG();
+
if (!gen_client(conn))
RETURN(-EINVAL);
+ OBD_ALLOC(cb_data, sizeof(*cb_data));
+ if (cb_data == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ INIT_WAITQUEUE_HEAD(&cb_data->waitq);
+ atomic_set(&cb_data->count, 0);
+
+ for (i = 0; i < oa_bufs[0]; i++) {
+ struct page *current_page = buf[i];
+
+ struct lov_md *md = (struct lov_md *)oa[i]->inline;
+ int bufcount = oa_bufs[i];
+ // md->lmd_stripe_count
+
+ for (k = page_array_offset; k < bufcount + page_array_offset;
+ k++) {
+
+ }
+ page_array_offset += bufcount;
+
+
while (off < offset + count) {
int stripe, conn;
obd_size size, tmp;
if (size > *count)
size = *count;
-
conn = stripe % conn->oc_dev->obd_multi_count;
tmp = size;
- rc = obd_read(&conn->oc_dev->obd_multi_conn[conn], oa, buf,
- &size, off);
+ atomic_inc(&cb_data->count);
+ rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
+ num_oa, oa, buf,
+ &size, off, lov_read_callback, cb_data);
if (rc == 0)
retval += size;
else {
buf += size;
}
+ wait_event_interruptible(&cb_data->waitq,
+ lov_read_check_status(cb_data));
+ if (cb_data->flags & PTL_RPC_FL_INTR)
+ rc = -EINTR;
+
+ /* FIXME: The error handling here sucks */
*count = retval;
+ OBD_FREE(cb_data, sizeof(*cb_data));
RETURN(rc);
}
+static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
+{
+
+}
/* buffer must lie in user memory here */
static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf,
obd_size *count, obd_off offset)
{
int err;
- struct file * file;
+ struct file *file;
unsigned long retval;
ENTRY;
rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
RETURN(rc);
}
+#endif
struct obd_ops lov_obd_ops = {
o_setup: gen_multi_setup,
o_cleanup: gen_multi_cleanup,
o_create: lov_create,
- o_destroy: lov_destroy,
+ o_connect: lov_connect,
+ o_disconnect: lov_disconnect,
o_getattr: lov_getattr,
o_setattr: lov_setattr,
o_open: lov_open,
o_close: lov_close,
- o_connect: lov_connect,
- o_disconnect: lov_disconnect,
+#if 0
+ o_destroy: lov_destroy,
o_brw: lov_pgcache_brw,
o_punch: lov_punch,
o_enqueue: lov_enqueue,
o_cancel: lov_cancel
+#endif
};
int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
struct inode *dir, const char *name, int namelen,
- const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid,
- __u32 gid, __u64 time, struct ptlrpc_request **request)
+ const char *tgt, int tgtlen, int mode, __u32 uid,
+ __u32 gid, __u64 time, __u64 rdev, struct obdo *obdo,
+ struct ptlrpc_request **request)
{
struct mds_rec_create *rec;
struct ptlrpc_request *req;
- int rc, size[3] = {sizeof(*rec), namelen + 1, tgtlen + 1};
- char *tmp;
- int level;
+ int rc, size[3] = {sizeof(*rec), namelen + 1, 0};
+ char *tmp, *bufs[3] = {NULL, NULL, NULL};
+ int level, bufcount = 2;
ENTRY;
- req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL);
+ if (S_ISREG(mode)) {
+ size[2] = sizeof(*obdo);
+ bufs[2] = (char *)obdo;
+ bufcount = 3;
+ } else if (S_ISLNK(mode)) {
+ size[2] = tgtlen + 1;
+ bufcount = 3;
+ }
+
+ req = ptlrpc_prep_req(cl, conn, MDS_REINT, bufcount, size, bufs);
if (!req)
RETURN(-ENOMEM);
rec = lustre_msg_buf(req->rq_reqmsg, 0);
- mds_create_pack(rec, dir, mode, id, uid, gid, time);
+ mds_create_pack(rec, dir, mode, rdev, uid, gid, time);
tmp = lustre_msg_buf(req->rq_reqmsg, 1);
LOGL0(name, namelen, tmp);
- if (tgt) {
+ if (S_ISREG(mode)) {
+ tmp = lustre_msg_buf(req->rq_reqmsg, 2);
+ memcpy(tmp, obdo, sizeof(*obdo));
+ } else if (S_ISLNK(mode)) {
tmp = lustre_msg_buf(req->rq_reqmsg, 2);
LOGL0(tgt, tgtlen, tmp);
}
ll_ino2fid(&body->fid1, ino, 0, type);
body->valid = valid;
- if (valid & OBD_MD_LINKNAME) {
+ if (S_ISREG(type)) {
+ bufcount = 2;
+ size[1] = sizeof(struct obdo);
+ } else if (valid & OBD_MD_LINKNAME) {
bufcount = 2;
size[1] = ea_size;
}
body = lustre_msg_buf(req->rq_reqmsg, 0);
ll_ino2fid(&body->fid1, ino, 0, type);
body->flags = HTON__u32(flags);
- body->objid = cookie;
+ body->extra = cookie;
req->rq_replen = lustre_msg_size(1, &size);
if (!rc) {
mds_unpack_rep_body(req);
body = lustre_msg_buf(req->rq_repmsg, 0);
- *fh = body->objid;
+ *fh = body->extra;
}
EXIT;
body = lustre_msg_buf(req->rq_reqmsg, 0);
ll_ino2fid(&body->fid1, ino, 0, type);
- body->objid = fh;
+ body->extra = fh;
req->rq_level = LUSTRE_CONN_FULL;
req->rq_replen = lustre_msg_size(0, NULL);
err = mdc_create(&cl, conn, &inode,
"foofile", strlen("foofile"),
NULL, 0, 0100707, 47114711,
- 11, 47, 0, &request);
+ 11, 47, 0, NULL, &request);
CERROR("-- done err %d\n", err);
GOTO(out, err);
DEFS:=
MODULE = mds
-modulefs_DATA = mds.o mds_ext2.o mds_ext3.o mds_extN.o
-EXTRA_PROGRAMS = mds mds_ext2 mds_ext3 mds_extN
+modulefs_DATA = mds.o mds_extN.o # mds_ext2.o mds_ext3.o
+EXTRA_PROGRAMS = mds mds_extN # mds_ext2 mds_ext3
LINX=mds_updates.c simple.c
mds_updates.c:
#include <linux/module.h>
#include <linux/lustre_mds.h>
-static
-int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
+static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
+ __u64 offset)
{
int rc = 0;
mm_segment_t oldfs = get_fs();
desc->b_portal = MDS_BULK_PORTAL;
rc = ptlrpc_send_bulk(desc);
+ if (rc)
+ GOTO(cleanup_buf, rc);
+
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
OBD_FAIL_MDS_SENDPAGE, rc);
GOTO(cleanup_buf, rc);
}
+ wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ GOTO(cleanup_buf, rc = -EINTR);
+
EXIT;
cleanup_buf:
OBD_FREE(buf, PAGE_SIZE);
RETURN(0);
}
-static
-int mds_getattr(struct ptlrpc_request *req)
+static int mds_getattr(struct ptlrpc_request *req)
{
struct dentry *de;
struct inode *inode;
struct mds_body *body;
struct mds_obd *mds = &req->rq_obd->u.mds;
- int rc, size[2] = {sizeof(*body)}, count = 1;
+ int rc, size[2] = {sizeof(*body)}, bufcount = 1;
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
req->rq_status = -ENOENT;
RETURN(0);
}
+
inode = de->d_inode;
- if (body->valid & OBD_MD_LINKNAME) {
- count = 2;
+ if (S_ISREG(body->fid1.f_type)) {
+ bufcount = 2;
+ size[1] = sizeof(struct obdo);
+ } else if (body->valid & OBD_MD_LINKNAME) {
+ bufcount = 2;
size[1] = inode->i_size;
}
- rc = lustre_pack_msg(count, size, NULL, &req->rq_replen,
+ rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
&req->rq_repmsg);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
CERROR("mds: out of memory\n");
body->size = inode->i_size;
body->mode = inode->i_mode;
body->nlink = inode->i_nlink;
+ body->valid = ~0; /* FIXME: should be more selective */
+
if (S_ISREG(inode->i_mode)) {
- rc = mds_fs_get_objid(mds, inode, &body->objid);
+ rc = mds_fs_get_obdo(mds, inode,
+ lustre_msg_buf(req->rq_repmsg, 1));
if (rc < 0) {
req->rq_status = rc;
- CERROR("readlink failed: %d\n", rc);
+ CERROR("mds_fs_get_objid failed: %d\n", rc);
GOTO(out, 0);
}
}
- body->valid = ~0; /* FIXME: should be more selective */
out:
l_dput(de);
RETURN(0);
list_for_each(tmp, &mci->mci_open_head) {
struct mds_file_data *fd;
fd = list_entry(tmp, struct mds_file_data, mfd_list);
- if (body->objid == fd->mfd_clientfd &&
+ if (body->extra == fd->mfd_clientfd &&
body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) {
CERROR("Re opening %Ld\n", body->fid1.id);
RETURN(0);
file->private_data = mfd;
mfd->mfd_file = file;
- mfd->mfd_clientfd = body->objid;
+ mfd->mfd_clientfd = body->extra;
list_add(&mfd->mfd_list, &mci->mci_open_head);
body = lustre_msg_buf(req->rq_repmsg, 0);
- body->objid = (__u64) (unsigned long)file;
+ body->extra = (__u64) (unsigned long)file;
RETURN(0);
}
RETURN(0);
}
- file = (struct file *)(unsigned long)body->objid;
+ file = (struct file *)(unsigned long)body->extra;
if (!file->f_dentry)
LBUG();
mfd = (struct mds_file_data *)file->private_data;
#include <linux/extN_xattr.h>
#include <linux/lustre_mds.h>
#include <linux/module.h>
+#include <linux/obd_lov.h>
static struct mds_fs_operations mds_extN_fs_ops;
static kmem_cache_t *jcb_cache;
};
struct mds_objid {
- __u16 mo_magic;
- __u8 mo_unused;
- __u8 mo_ost;
- __u64 mo_id;
+ __u64 mo_magic;
+ struct lov_md mo_lov_md;
};
#define EXTN_XATTR_INDEX_LUSTRE 5
return inode_setattr(inode, iattr);
}
-static int mds_extN_set_objid(struct inode *inode, void *handle, obd_id id)
+static int mds_extN_set_obdo(struct inode *inode, void *handle,
+ struct obdo *obdo)
{
- struct mds_objid data;
+ struct mds_objid *data = (struct mds_objid *)obdo->o_inline;
int rc;
- data.mo_magic = cpu_to_le16(XATTR_MDS_MO_MAGIC);
- data.mo_unused = 0;
- data.mo_ost = 0; /* FIXME: store OST index here */
- data.mo_id = cpu_to_le64(id);
+ data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC);
lock_kernel();
down(&inode->i_sem);
- if (id == 0) {
+ if (obdo == NULL)
rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0);
- } else {
+ else
rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
- XATTR_LUSTRE_MDS_OBJID, &data,
- sizeof(struct mds_objid), XATTR_CREATE);
- }
+ XATTR_LUSTRE_MDS_OBJID, obdo->o_inline,
+ OBD_INLINESZ, XATTR_CREATE);
up(&inode->i_sem);
unlock_kernel();
if (rc)
CERROR("error adding objectid %Ld to inode %ld\n",
- (unsigned long long)id, inode->i_ino);
+ (unsigned long long)obdo->o_id, inode->i_ino);
return rc;
}
-static int mds_extN_get_objid(struct inode *inode, obd_id *id)
+static int mds_extN_get_obdo(struct inode *inode, struct obdo *obdo)
{
+ char *buf;
struct mds_objid data;
- int rc;
+ struct lov_object_id *lov_ids;
+ int rc, size;
lock_kernel();
down(&inode->i_sem);
rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE,
XATTR_LUSTRE_MDS_OBJID, &data,
sizeof(struct mds_objid));
+ size = sizeof(struct mds_objid) + data.mo_lov_md.lmd_stripe_count *
+ sizeof(*lov_ids);
+ OBD_ALLOC(buf, size);
+ if (buf == NULL)
+ LBUG();
+ rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE,
+ XATTR_LUSTRE_MDS_OBJID, buf, size);
up(&inode->i_sem);
unlock_kernel();
+ if (size > OBD_INLINESZ)
+ LBUG();
+
if (rc < 0) {
CERROR("error getting EA %s from MDS inode %ld: rc = %d\n",
XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc);
- *id = 0;
- } else if (data.mo_magic != cpu_to_le16(XATTR_MDS_MO_MAGIC)) {
- CERROR("MDS object id %Ld has bad magic %x\n",
- (unsigned long long)le64_to_cpu(data.mo_id),
- le16_to_cpu(data.mo_magic));
+ obdo->o_id = 0;
+ } else if (data.mo_magic != cpu_to_le64(XATTR_MDS_MO_MAGIC)) {
+ CERROR("MDS object id %Ld has bad magic %Lx\n",
+ (unsigned long long)obdo->o_id,
+ (unsigned long long)le64_to_cpu(data.mo_magic));
rc = -EINVAL;
} else {
- *id = le64_to_cpu(data.mo_id);
- /* FIXME: will actually use data.mo_ost at some point */
- if (data.mo_ost)
- CERROR("MDS objid %Ld with ost index %d!\n",
- *id, data.mo_ost);
+ /* This field is byteswapped because it appears in the
+ * catalogue. All others are opaque to the MDS */
+ obdo->o_id = le64_to_cpu(data.mo_lov_md.lmd_object_id);
+ memcpy(obdo->o_inline, buf + sizeof(data), size);
}
+#warning FIXME: pass this buffer to caller for transmission when size exceeds OBD_INLINESZ
+ OBD_FREE(buf, size);
return rc;
}
EXIT;
return;
}
- if (mds_extN_set_objid(inode, handle, 0))
- CERROR("error clearing objid on %ld\n", inode->i_ino);
+ if (mds_extN_set_obdo(inode, handle, NULL))
+ CERROR("error clearing obdo on %ld\n", inode->i_ino);
if (mds_extN_fs_ops.cl_delete_inode)
mds_extN_fs_ops.cl_delete_inode(inode);
fs_start: mds_extN_start,
fs_commit: mds_extN_commit,
fs_setattr: mds_extN_setattr,
- fs_set_objid: mds_extN_set_objid,
- fs_get_objid: mds_extN_get_objid,
+ fs_set_obdo: mds_extN_set_obdo,
+ fs_get_obdo: mds_extN_get_obdo,
fs_readpage: mds_extN_readpage,
fs_delete_inode: mds_extN_delete_inode,
cl_delete_inode: clear_inode,
// XXX - add transaction sequence numbers
#define EXPORT_SYMTAB
-
-#include <linux/version.h>
-#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-#include <asm/uaccess.h>
-
#define DEBUG_SUBSYSTEM S_MDS
#include <linux/obd_support.h>
case S_IFBLK:
case S_IFIFO:
case S_IFSOCK: {
- int rdev = rec->ur_id;
+ int rdev = rec->ur_rdev;
handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
if (!handle)
GOTO(out_create_dchild, PTR_ERR(handle));
CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
if (type == S_IFREG) {
- rc = mds_fs_set_objid(mds, inode, handle, rec->ur_id);
+ struct obdo *obdo;
+ obdo = lustre_msg_buf(req->rq_reqmsg, 2);
+ rc = mds_fs_set_obdo(mds, inode, handle, obdo);
if (rc)
CERROR("error %d setting objid for %ld\n",
rc, inode->i_ino);
handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
if (!handle)
GOTO(out_rename_denew, rc = PTR_ERR(handle));
- rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
+ rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
+ NULL);
if (!rc)
rc = mds_update_last_rcvd(mds, handle, req);
}
err = obd_brw(rw, conns, num, obdos, oa_bufs, bufs,
- counts, offsets, flags);
+ counts, offsets, flags, NULL);
EXIT;
brw_cleanup:
page->index = index;
err = OBP(src_conn->oc_dev, brw)(READ, src_conn, num_oa, &src,
&num_buf, &page, &brw_count,
- &brw_offset, &flagr);
+ &brw_offset, &flagr, NULL);
if ( err ) {
EXIT;
err = OBP(dst_conn->oc_dev, brw)(WRITE, dst_conn, num_oa, &dst,
&num_buf, &page, &brw_count,
- &brw_offset, &flagw);
+ &brw_offset, &flagw, NULL);
/* XXX should handle dst->o_size, dst->o_blocks here */
if ( err ) {
inode->i_mode = S_IFREG;
push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
+
rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
pop_ctxt(&saved);
CDEBUG(D_INODE, "put child %p, count = %d\n", object_dentry,
static int filter_pgcache_brw(int rw, struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs,
struct page **pages, obd_size *count,
- obd_off *offset, obd_flag *flags)
+ obd_off *offset, obd_flag *flags, void *callback)
{
struct obd_run_ctxt saved;
struct super_block *sb;
obdfs_from_inode(oa, inode);
err = obd_brw(rw, IID(inode), num_obdo, &oa, &bufs_per_obdo,
- &page, &count, &offset, &flags);
+ &page, &count, &offset, &flags, NULL);
//if ( !err )
// obdfs_to_inode(inode, oa); /* copy o_blocks to i_blocks */
from, to, (unsigned long long)count);
err = obd_brw(WRITE, IID(inode), num_obdo, &oa, &bufs_per_obdo,
- &page, &count, &offset, &flags);
+ &page, &count, &offset, &flags, NULL);
if ( !err ) {
SetPageUptodate(page);
set_page_clean(page);
ENTRY;
osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
+ request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
return 0;
}
-static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
- struct niobuf_remote *dst, struct niobuf_local *src)
+struct osc_brw_cb_data {
+ struct page **buf;
+ struct ptlrpc_request *req;
+ bulk_callback_t callback;
+ void *cb_data;
+};
+
+static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
{
- struct ptlrpc_bulk_page *page;
- ENTRY;
+ struct osc_brw_cb_data *cb_data = data;
- page = ptlrpc_prep_bulk_page(desc);
- if (page == NULL)
- RETURN(-ENOMEM);
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ CERROR("got signal\n");
- page->b_buf = (void *)(unsigned long)src->addr;
- page->b_buflen = src->len;
- page->b_xid = dst->xid;
+ (cb_data->callback)(desc, cb_data->cb_data);
- RETURN(0);
+ ptlrpc_free_bulk(desc);
+ ptlrpc_free_req(cb_data->req);
+
+ OBD_FREE(cb_data, sizeof(*cb_data));
}
static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags)
+ obd_size *count, obd_off *offset, obd_flag *flags,
+ bulk_callback_t callback)
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
for (pages = 0, i = 0; i < num_oa; i++) {
ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+ /* FIXME: this inner loop is wrong for multiple OAs */
for (j = 0; j < oa_bufs[i]; j++, pages++) {
struct ptlrpc_bulk_page *bulk;
bulk = ptlrpc_prep_bulk_page(desc);
return rc;
}
+static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
+{
+ struct osc_brw_cb_data *cb_data = data;
+ int i;
+
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ CERROR("got signal\n");
+
+ for (i = 0; i < desc->b_page_count; i++)
+ kunmap(cb_data->buf[i]);
+
+ (cb_data->callback)(desc, cb_data->cb_data);
+
+ ptlrpc_free_bulk(desc);
+ ptlrpc_free_req(cb_data->req);
+
+ OBD_FREE(cb_data, sizeof(*cb_data));
+}
+
static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs,
- struct page **pagearray, obd_size *count, obd_off *offset,
- obd_flag *flags)
+ struct page **pagearray, obd_size *count,
+ obd_off *offset, obd_flag *flags,
+ bulk_callback_t callback)
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
struct ost_body *body;
struct niobuf_local *local;
struct niobuf_remote *remote;
+ struct osc_brw_cb_data *cb_data;
long pages;
int rc, i, j, size[3] = {sizeof(*body)};
void *ptr1, *ptr2;
desc = ptlrpc_prep_bulk(connection);
desc->b_portal = OSC_BULK_PORTAL;
+ if (callback) {
+ desc->b_cb = brw_write_finish;
+ OBD_ALLOC(cb_data, sizeof(*cb_data));
+ cb_data->buf = pagearray;
+ cb_data->callback = callback;
+ desc->b_cb_data = cb_data;
+ }
for (pages = 0, i = 0; i < num_oa; i++) {
for (j = 0; j < oa_bufs[i]; j++, pages++) {
+ struct ptlrpc_bulk_page *page;
+
ost_unpack_niobuf(&ptr2, &remote);
- rc = osc_sendpage(desc, remote, &local[pages]);
- if (rc)
- GOTO(out3, rc);
+
+ page = ptlrpc_prep_bulk_page(desc);
+ if (page == NULL)
+ GOTO(out3, rc = -ENOMEM);
+
+ page->b_buf = (void *)(unsigned long)local[pages].addr;
+ page->b_buflen = local[pages].len;
+ page->b_xid = remote->xid;
}
}
+ if (desc->b_page_count != pages)
+ LBUG();
+
rc = ptlrpc_send_bulk(desc);
+ if (rc)
+ GOTO(out3, rc);
+ if (callback)
+ GOTO(out, rc);
+
+ /* If there's no callback function, sleep here until complete. */
+ wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ rc = -EINTR;
+
GOTO(out3, rc);
out3:
static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags)
+ obd_size *count, obd_off *offset, obd_flag *flags,
+ void *callback)
{
+ if (num_oa != 1)
+ LBUG();
+
if (rw == OBD_BRW_READ)
return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
- offset, flags);
+ offset, flags, (bulk_callback_t)callback);
else
return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
- offset, flags);
+ offset, flags, (bulk_callback_t)callback);
}
static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
RETURN(rc);
}
-static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
+static void ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc, void *data)
{
ptlrpc_free_bulk(desc);
-
- return 0;
}
static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
/*
* Free the packet when it has gone out
*/
-static int request_out_callback(ptl_event_t *ev, void *data)
+static int request_out_callback(ptl_event_t *ev)
{
ENTRY;
/*
* Free the packet when it has gone out
*/
-static int reply_out_callback(ptl_event_t *ev, void *data)
+static int reply_out_callback(ptl_event_t *ev)
{
ENTRY;
/*
* Wake up the thread waiting for the reply once it comes in.
*/
-static int reply_in_callback(ptl_event_t *ev, void *data)
+static int reply_in_callback(ptl_event_t *ev)
{
struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
ENTRY;
RETURN(1);
}
-int request_in_callback(ptl_event_t *ev, void *data)
+int request_in_callback(ptl_event_t *ev)
{
- struct ptlrpc_service *service = data;
+ struct ptlrpc_service *service = ev->mem_desc.user_ptr;
int index;
if (ev->rlength != ev->mlength)
return 0;
}
-static int bulk_source_callback(ptl_event_t *ev, void *data)
+static int bulk_source_callback(ptl_event_t *ev)
{
struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
struct ptlrpc_bulk_desc *desc = bulk->b_desc;
CDEBUG(D_NET, "got SENT event\n");
} else if (ev->type == PTL_EVENT_ACK) {
CDEBUG(D_NET, "got ACK event\n");
- desc->b_flags |= PTL_BULK_FL_SENT;
- wake_up_interruptible(&desc->b_waitq);
+ if (bulk->b_cb != NULL)
+ bulk->b_cb(bulk);
+ if (atomic_dec_and_test(&desc->b_finished_count)) {
+ desc->b_flags |= PTL_BULK_FL_SENT;
+ wake_up_interruptible(&desc->b_waitq);
+ if (desc->b_cb != NULL)
+ desc->b_cb(desc, desc->b_cb_data);
+ }
} else {
CERROR("Unexpected event type!\n");
LBUG();
RETURN(1);
}
-static int bulk_sink_callback(ptl_event_t *ev, void *data)
+static int bulk_sink_callback(ptl_event_t *ev)
{
struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
struct ptlrpc_bulk_desc *desc = bulk->b_desc;
if (ev->type == PTL_EVENT_PUT) {
if (bulk->b_buf != ev->mem_desc.start + ev->offset)
CERROR("bulkbuf != mem_desc -- why?\n");
- desc->b_finished_count++;
if (bulk->b_cb != NULL)
bulk->b_cb(bulk);
- if (desc->b_finished_count == desc->b_page_count) {
+ if (atomic_dec_and_test(&desc->b_finished_count)) {
desc->b_flags |= PTL_BULK_FL_RCVD;
wake_up_interruptible(&desc->b_waitq);
if (desc->b_cb != NULL)
- desc->b_cb(desc);
+ desc->b_cb(desc, desc->b_cb_data);
}
} else {
CERROR("Unexpected event type!\n");
else
ni = *socknal_nip;
- rc = PtlEQAlloc(ni, 128, request_out_callback, NULL, &request_out_eq);
+ rc = PtlEQAlloc(ni, 128, request_out_callback, &request_out_eq);
if (rc != PTL_OK)
CERROR("PtlEQAlloc failed: %d\n", rc);
- rc = PtlEQAlloc(ni, 128, reply_out_callback, NULL, &reply_out_eq);
+ rc = PtlEQAlloc(ni, 128, reply_out_callback, &reply_out_eq);
if (rc != PTL_OK)
CERROR("PtlEQAlloc failed: %d\n", rc);
- rc = PtlEQAlloc(ni, 128, reply_in_callback, NULL, &reply_in_eq);
+ rc = PtlEQAlloc(ni, 128, reply_in_callback, &reply_in_eq);
if (rc != PTL_OK)
CERROR("PtlEQAlloc failed: %d\n", rc);
- rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq);
+ rc = PtlEQAlloc(ni, 128, bulk_source_callback, &bulk_source_eq);
if (rc != PTL_OK)
CERROR("PtlEQAlloc failed: %d\n", rc);
- rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq);
+ rc = PtlEQAlloc(ni, 128, bulk_sink_callback, &bulk_sink_eq);
if (rc != PTL_OK)
CERROR("PtlEQAlloc failed: %d\n", rc);
bulk_source_eq, bulk_sink_eq;
static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY};
-static int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
{
ENTRY;
ptl_process_id_t remote_id;
ENTRY;
+ atomic_set(&desc->b_finished_count, desc->b_page_count);
+
list_for_each_safe(tmp, next, &desc->b_page_list) {
/* only request an ACK for the last page */
int ack = (next == &desc->b_page_list);
rc = PtlPut(bulk->b_md_h, (ack ? PTL_ACK_REQ : PTL_NOACK_REQ),
remote_id, desc->b_portal, 0, bulk->b_xid, 0, 0);
if (rc != PTL_OK) {
- CERROR("PtlPut(%Lu, %d, %d) failed: %d\n", remote_id.nid,
- desc->b_portal, bulk->b_xid, rc);
+ CERROR("PtlPut(%Lu, %d, %d) failed: %d\n",
+ remote_id.nid, desc->b_portal, bulk->b_xid, rc);
PtlMDUnlink(bulk->b_md_h);
LBUG();
RETURN(rc);
}
}
- wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-
- if (desc->b_flags & PTL_RPC_FL_INTR)
- RETURN(-EINTR);
-
RETURN(0);
}
int ptl_send_rpc(struct ptlrpc_request *request)
{
- ptl_process_id_t local_id;
int rc;
char *repbuf;
RETURN(ENOMEM);
}
- local_id.nid = PTL_ID_ANY;
- local_id.pid = PTL_ID_ANY;
-
down(&request->rq_client->cli_rpc_sem);
rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
#include <linux/lustre_net.h>
-extern int request_in_callback(ptl_event_t *ev, void *data);
+extern int request_in_callback(ptl_event_t *ev);
extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
static int ptlrpc_check_event(struct ptlrpc_service *svc,
service->srv_ring_length = RPC_RING_LENGTH;
rc = PtlEQAlloc(service->srv_self.peer_ni, 128, request_in_callback,
- service, &(service->srv_eq_h));
+ &(service->srv_eq_h));
if (rc != PTL_OK) {
CERROR("PtlEQAlloc failed: %d\n", rc);