From: pschwan Date: Thu, 13 Jun 2002 19:25:10 +0000 (+0000) Subject: WARNING: This commit breaks everything. It will be back in shape within 12 X-Git-Tag: v1_7_100~5541 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=f6e2f83978f715293b3f78fd91971027eb033b36 WARNING: This commit breaks everything. It will be back in shape within 12 hours, we hope, but update at your own (extreme) risk. - Adds more unfinished LOV functionality - Changes MDS EAs to contain LOV data - Adds new brw callbacks - Fixes to work with the Portals tip --- diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index a1e329e..41216ad 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -61,24 +61,6 @@ struct lustre_msg { __u32 buflens[0]; }; -struct niobuf_remote { - __u64 offset; - __u32 len; - __u32 xid; - __u32 flags; -}; - -struct niobuf_local { - __u64 offset; - __u32 len; - __u32 xid; - __u32 flags; - void *addr; - struct page *page; - void *target_private; - struct dentry *dentry; -}; - #define N_LOCAL_TEMP_PAGE 0x00000001 /* @@ -114,8 +96,8 @@ typedef uint32_t obd_rdev; typedef uint32_t obd_flag; typedef uint32_t obd_count; -#define OBD_FL_INLINEDATA (0x00000001UL) -#define OBD_FL_OBDMDEXISTS (0x00000002UL) +#define OBD_FL_INLINEDATA (0x00000001) +#define OBD_FL_OBDMDEXISTS (0x00000002) #define OBD_INLINESZ 60 #define OBD_OBDMDSZ 60 @@ -166,6 +148,31 @@ struct obdo { #define OBD_MD_FLNOTOBD (~(OBD_MD_FLOBDMD | OBD_MD_FLOBDFLG | OBD_MD_FLBLOCKS |\ OBD_MD_LINKNAME)) +struct obd_ioobj { + obd_id ioo_id; + obd_gr ioo_gr; + __u32 ioo_type; + __u32 ioo_bufcnt; +}; + +struct niobuf_remote { + __u64 offset; + __u32 len; + __u32 xid; + __u32 flags; +}; + +struct niobuf_local { + __u64 offset; + __u32 len; + __u32 xid; + __u32 flags; + void *addr; + struct page *page; + void *target_private; + struct dentry *dentry; +}; + /* request structure for OST's */ #define OST_REQ_HAS_OA1 0x1 @@ -176,13 +183,6 @@ struct ost_body { struct obdo oa; }; -struct obd_ioobj { - obd_id ioo_id; - obd_gr ioo_gr; - __u32 ioo_type; - __u32 ioo_bufcnt; -}; - /* * MDS REQ RECORDS */ @@ -213,8 +213,8 @@ struct ll_fid { struct mds_body { struct ll_fid fid1; struct ll_fid fid2; - __u64 objid; __u64 size; + __u64 extra; __u32 valid; __u32 mode; __u32 uid; @@ -257,8 +257,7 @@ struct mds_rec_create { __u32 cr_gid; __u64 cr_time; __u32 cr_mode; - /* overloaded: id for create, tgtlen for symlink, rdev for mknod */ - __u64 cr_id; + __u64 cr_rdev; }; struct mds_rec_link { diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 3a9188f..bdb9ed1 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -29,6 +29,11 @@ struct ll_file_data { __u32 fd_flags; }; +struct ll_inode_md { + struct mds_body *body; + struct obdo *obdo; +}; + #define LL_IOC_GETFLAGS _IOR ('f', 151, long) #define LL_IOC_SETFLAGS _IOW ('f', 152, long) #define LL_IOC_CLRFLAGS _IOW ('f', 153, long) @@ -38,7 +43,8 @@ struct ll_file_data { #define LL_INLINESZ 60 struct ll_inode_info { int lli_flags; - __u64 lli_objid; + struct obdo *lli_obdo; + char *lli_symlink_name; char lli_inline[LL_INLINESZ]; }; diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index ffd84d1..69f71fb 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -43,7 +43,7 @@ struct mds_update_record { int ur_tgtlen; char *ur_tgt; struct iattr ur_iattr; - __u64 ur_id; + __u64 ur_rdev; __u32 ur_mode; __u32 ur_uid; __u32 ur_gid; @@ -132,8 +132,8 @@ int mdc_readpage(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino, int type, __u64 offset, char *addr, struct ptlrpc_request **); int mdc_create(struct ptlrpc_client *, struct ptlrpc_connection *, struct inode *dir, const char *name, int namelen, - const char *tgt, int tgtlen, - int mode, __u64 id, __u32 uid, __u32 gid, __u64 time, + const char *tgt, int tgtlen, int mode, __u32 uid, __u32 gid, + __u64 time, __u64 rdev, struct obdo *obdo, struct ptlrpc_request **); int mdc_unlink(struct ptlrpc_client *, struct ptlrpc_connection *, struct inode *dir, struct inode *child, const char *name, @@ -156,8 +156,9 @@ struct mds_fs_operations { int (* fs_commit)(struct inode *inode, void *handle); int (* fs_setattr)(struct dentry *dentry, void *handle, struct iattr *iattr); - int (* fs_set_objid)(struct inode *inode, void *handle, obd_id id); - int (* fs_get_objid)(struct inode *inode, obd_id *id); + int (* fs_set_obdo)(struct inode *inode, void *handle, + struct obdo *obdo); + int (* fs_get_obdo)(struct inode *inode, struct obdo *obdo); ssize_t (* fs_readpage)(struct file *file, char *buf, size_t count, loff_t *offset); void (* fs_delete_inode)(struct inode *inode); @@ -196,16 +197,16 @@ static inline int mds_fs_setattr(struct mds_obd *mds, struct dentry *dentry, return mds->mds_fsops->fs_setattr(dentry, handle, iattr); } -static inline int mds_fs_set_objid(struct mds_obd *mds, struct inode *inode, - void *handle, __u64 id) +static inline int mds_fs_set_obdo(struct mds_obd *mds, struct inode *inode, + void *handle, struct obdo *obdo) { - return mds->mds_fsops->fs_set_objid(inode, handle, id); + return mds->mds_fsops->fs_set_obdo(inode, handle, obdo); } -static inline int mds_fs_get_objid(struct mds_obd *mds, struct inode *inode, - __u64 *id) +static inline int mds_fs_get_obdo(struct mds_obd *mds, struct inode *inode, + struct obdo *obdo) { - return mds->mds_fsops->fs_get_objid(inode, id); + return mds->mds_fsops->fs_get_obdo(inode, obdo); } static inline ssize_t mds_fs_readpage(struct mds_obd *mds, struct file *file, diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index ae665db..ae09030 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -197,13 +197,14 @@ struct ptlrpc_bulk_desc { struct ptlrpc_connection *b_connection; struct ptlrpc_client *b_client; __u32 b_portal; - int (*b_cb)(struct ptlrpc_bulk_desc *); struct obd_conn b_conn; + void (*b_cb)(struct ptlrpc_bulk_desc *, void *); + void *b_cb_data; wait_queue_head_t b_waitq; struct list_head b_page_list; __u32 b_page_count; - __u32 b_finished_count; + atomic_t b_finished_count; void *b_desc_private; }; @@ -245,6 +246,8 @@ struct ptlrpc_service { struct ptlrpc_request *req); }; +typedef void (*bulk_callback_t)(struct ptlrpc_bulk_desc *, void *); + typedef int (*svc_handler_t)(struct obd_device *obddev, struct ptlrpc_service *svc, struct ptlrpc_request *req); @@ -258,6 +261,7 @@ void ptlrpc_init_connection(void); void ptlrpc_cleanup_connection(void); /* rpc/niobuf.c */ +int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk); int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk); diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 2c63782..b7b2014 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -159,13 +159,22 @@ struct osc_obd { struct ptlrpc_connection *osc_conn; }; +typedef __u8 uuid_t[37]; + +#define MAX_MULTI 16 struct lov_obd { + __u32 lov_default_count; + __u32 lov_default_pattern; + __u32 lov_default_size; + uuid_t lov_service_uuids[MAX_MULTI]; + +#if 0 int lov_count; struct obd_conn *lov_targets; +#endif }; /* corresponds to one of the obd's */ -#define MAX_MULTI 16 struct obd_device { struct obd_type *obd_type; char *obd_name; @@ -231,7 +240,8 @@ struct obd_ops { obd_size *count, obd_off offset); int (*o_brw)(int rw, struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, - obd_size *count, obd_off *offset, obd_flag *flags); + obd_size *count, obd_off *offset, obd_flag *flags, + void *); int (*o_punch)(struct obd_conn *conn, struct obdo *tgt, obd_size count, obd_off offset); int (*o_sync)(struct obd_conn *conn, struct obdo *tgt, obd_size count, @@ -251,6 +261,7 @@ struct obd_ops { int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *local, void *desc_private); + int (*o_enqueue)(struct obd_conn *conn, struct ldlm_namespace *ns, struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type, struct ldlm_extent *, __u32 mode, diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 40d7343..7054978 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -295,14 +295,14 @@ static inline int obd_punch(struct obd_conn *conn, struct obdo *tgt, static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, obd_size *count, obd_off *offset, - obd_flag *flags) + obd_flag *flags, void *callback) { int rc; OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,brw); rc = OBP(conn->oc_dev, brw)(rw, conn, num_oa, oa, oa_bufs, buf, - count, offset, flags); + count, offset, flags, callback); RETURN(rc); } @@ -403,6 +403,8 @@ static __inline__ struct obdo *obdo_alloc(void) struct obdo *oa = NULL; oa = kmem_cache_alloc(obdo_cachep, SLAB_KERNEL); + if (oa == NULL) + LBUG(); memset(oa, 0, sizeof (*oa)); return oa; diff --git a/lustre/include/linux/obd_lov.h b/lustre/include/linux/obd_lov.h new file mode 100644 index 0000000..9cfbd85 --- /dev/null +++ b/lustre/include/linux/obd_lov.h @@ -0,0 +1,26 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + */ + +#ifndef _OBD_LOV_H__ +#define _OBD_LOV_H__ + +#ifdef __KERNEL__ + +#define OBD_LOV_DEVICENAME "lov" + +struct lov_object_id { /* per-child structure */ + __u64 l_object_id; + __u32 l_device_id; +}; + +struct lov_md { + __u64 lmd_object_id; /* lov object id */ + __u64 lmd_stripe_count; + __u32 lmd_stripe_size; + __u32 lmd_stripe_pattern; /* per-lov object stripe pattern */ + struct lov_object_id lmd_objects[0]; +}; + +#endif +#endif diff --git a/lustre/lib/mds_updates.c b/lustre/lib/mds_updates.c index 2f49c34..4441661 100644 --- a/lustre/lib/mds_updates.c +++ b/lustre/lib/mds_updates.c @@ -50,7 +50,7 @@ static void mds_pack_body(struct mds_body *b) mds_pack_fid(&b->fid1); mds_pack_fid(&b->fid2); - b->objid = HTON__u64(b->objid); + b->extra = HTON__u64(b->extra); b->size = HTON__u64(b->size); b->valid = HTON__u32(b->valid); b->mode = HTON__u32(b->mode); @@ -83,13 +83,13 @@ void mds_pack_rep_body(struct ptlrpc_request *req) /* packing of MDS records */ void mds_create_pack(struct mds_rec_create *rec, struct inode *inode, - __u32 mode, __u64 id, __u32 uid, __u32 gid, __u64 time) + __u32 mode, __u64 rdev, __u32 uid, __u32 gid, __u64 time) { /* XXX do something about time, uid, gid */ rec->cr_opcode = HTON__u32(REINT_CREATE); ll_inode2fid(&rec->cr_fid, inode); rec->cr_mode = HTON__u32(mode); - rec->cr_id = HTON__u64(id); + rec->cr_rdev = HTON__u64(rdev); rec->cr_uid = HTON__u32(uid); rec->cr_gid = HTON__u32(gid); rec->cr_time = HTON__u64(time); @@ -151,7 +151,7 @@ static void mds_unpack_body(struct mds_body *b) mds_unpack_fid(&b->fid1); mds_unpack_fid(&b->fid2); - b->objid = NTOH__u64(b->objid); + b->extra = NTOH__u64(b->extra); b->size = NTOH__u64(b->size); b->valid = NTOH__u32(b->valid); b->mode = NTOH__u32(b->mode); @@ -212,13 +212,13 @@ static int mds_create_unpack(struct ptlrpc_request *req, struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, 0); ENTRY; - if (req->rq_reqmsg->bufcount != 3 || + if (req->rq_reqmsg->bufcount < 2 || req->rq_reqmsg->buflens[0] != sizeof(*rec)) RETURN(-EFAULT); r->ur_fid1 = &rec->cr_fid; r->ur_mode = NTOH__u32(rec->cr_mode); - r->ur_id = NTOH__u64(rec->cr_id); + r->ur_rdev = NTOH__u64(rec->cr_rdev); r->ur_uid = NTOH__u32(rec->cr_uid); r->ur_gid = NTOH__u32(rec->cr_gid); r->ur_time = NTOH__u64(rec->cr_time); @@ -226,8 +226,10 @@ static int mds_create_unpack(struct ptlrpc_request *req, r->ur_name = lustre_msg_buf(req->rq_reqmsg, 1); r->ur_namelen = req->rq_reqmsg->buflens[1]; - r->ur_tgt = lustre_msg_buf(req->rq_reqmsg, 2); - r->ur_tgtlen = req->rq_reqmsg->buflens[2]; + if (S_ISLNK(r->ur_mode)) { + r->ur_tgt = lustre_msg_buf(req->rq_reqmsg, 2); + r->ur_tgtlen = req->rq_reqmsg->buflens[2]; + } RETURN(0); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 834c35b..f43bfa0 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -30,8 +30,6 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc); extern int ll_setattr(struct dentry *de, struct iattr *attr); -extern inline struct obdo * ll_oa_from_inode(struct inode *inode, - unsigned long valid); static int ll_file_open(struct inode *inode, struct file *file) { @@ -61,14 +59,15 @@ static int ll_file_open(struct inode *inode, struct file *file) CERROR("mdc_open didn't assign fd_mdshandle\n"); /* XXX handle this how, abort or is it non-fatal? */ } + if (!fd->fd_mdshandle) + CERROR("mdc_open didn't assign fd_mdshandle\n"); - oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID)); + oa = ll_i2info(inode)->lli_obdo; if (oa == NULL) { LBUG(); GOTO(out_mdc, rc = -ENOMEM); } rc = obd_open(ll_i2obdconn(inode), oa); - obdo_free(oa); if (rc) { GOTO(out_mdc, rc = -abs(rc)); } @@ -106,16 +105,14 @@ static int ll_file_release(struct inode *inode, struct file *file) GOTO(out, rc = -EINVAL); } - oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID)); + oa = ll_i2info(inode)->lli_obdo; if (oa == NULL) { LBUG(); GOTO(out_fd, rc = -ENOENT); } rc = obd_close(ll_i2obdconn(inode), oa); - obdo_free(oa); - if (rc) { + if (rc) GOTO(out_fd, abs(rc)); - } if (file->f_mode & FMODE_WRITE) { struct iattr attr; diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 91f13b3..70990cb 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -95,10 +95,10 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry) struct ptlrpc_request *request = NULL; struct inode * inode = NULL; struct ll_sb_info *sbi = ll_i2sbi(dir); - int err; - int type; + struct ll_inode_md md; + int err, type; ino_t ino; - + ENTRY; if (dentry->d_name.len > EXT2_NAME_LEN) RETURN(ERR_PTR(-ENAMETOOLONG)); @@ -115,8 +115,18 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry) RETURN(ERR_PTR(-abs(err))); } - inode = iget4(dir->i_sb, ino, ll_find_inode, - lustre_msg_buf(request->rq_repmsg, 0)); + if (S_ISREG(type)) { + if (request->rq_repmsg->bufcount < 2 || + request->rq_repmsg->buflens[1] != sizeof(struct obdo)) + LBUG(); + + md.obdo = lustre_msg_buf(request->rq_repmsg, 1); + } else + md.obdo = NULL; + + md.body = lustre_msg_buf(request->rq_repmsg, 0); + + inode = iget4(dir->i_sb, ino, ll_find_inode, &md); ptlrpc_free_req(request); if (!inode) @@ -130,7 +140,7 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry) static struct inode *ll_create_node(struct inode *dir, const char *name, int namelen, const char *tgt, int tgtlen, - int mode, __u64 id) + int mode, __u64 extra, struct obdo *obdo) { struct inode *inode; struct ptlrpc_request *request = NULL; @@ -138,12 +148,13 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, int err; time_t time = CURRENT_TIME; struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_inode_md md; ENTRY; err = mdc_create(&sbi->ll_mds_client, sbi->ll_mds_conn, dir, name, - namelen, tgt, tgtlen, mode, id, current->fsuid, - current->fsgid, time, &request); + namelen, tgt, tgtlen, mode, current->fsuid, + current->fsgid, time, extra, obdo, &request); if (err) { inode = ERR_PTR(err); GOTO(out, err); @@ -151,16 +162,16 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, body = lustre_msg_buf(request->rq_repmsg, 0); body->valid = (__u32)OBD_MD_FLNOTOBD; - body->objid = id; body->nlink = 1; body->atime = body->ctime = body->mtime = time; body->uid = current->fsuid; body->gid = current->fsgid; body->mode = mode; - CDEBUG(D_INODE, "-- new_inode: objid %lld, ino %d, mode %o\n", - (unsigned long long)body->objid, body->ino, body->mode); - inode = iget4(dir->i_sb, body->ino, ll_find_inode, body); + md.body = body; + md.obdo = obdo; + + inode = iget4(dir->i_sb, body->ino, ll_find_inode, &md); if (IS_ERR(inode)) { CERROR("new_inode -fatal: %ld\n", PTR_ERR(inode)); inode = ERR_PTR(-EIO); @@ -249,7 +260,7 @@ static int ll_create (struct inode * dir, struct dentry * dentry, int mode) { int err, rc; struct obdo oa; - struct inode * inode; + struct inode *inode; memset(&oa, 0, sizeof(oa)); oa.o_valid = OBD_MD_FLMODE; @@ -263,8 +274,8 @@ static int ll_create (struct inode * dir, struct dentry * dentry, int mode) mode = mode | S_IFREG; CDEBUG(D_DENTRY, "name %s mode %o o_id %lld\n", dentry->d_name.name, mode, (unsigned long long)oa.o_id); - inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len, - NULL, 0, mode, oa.o_id); + inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len, + NULL, 0, mode, 0, &oa); if (IS_ERR(inode)) { rc = PTR_ERR(inode); @@ -290,11 +301,12 @@ out_destroy: } /* ll_create */ -static int ll_mknod (struct inode * dir, struct dentry *dentry, int mode, int rdev) +static int ll_mknod (struct inode * dir, struct dentry *dentry, int mode, + int rdev) { struct inode * inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len, NULL, 0, - mode, 0); + mode, rdev, NULL); int err = PTR_ERR(inode); if (!IS_ERR(inode)) { init_special_inode(inode, mode, rdev); @@ -316,7 +328,7 @@ static int ll_symlink (struct inode * dir, struct dentry * dentry, inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len, symname, l, - S_IFLNK | S_IRWXUGO, 0); + S_IFLNK | S_IRWXUGO, 0, NULL); err = PTR_ERR(inode); if (IS_ERR(inode)) return err; @@ -376,7 +388,7 @@ static int ll_mkdir(struct inode * dir, struct dentry * dentry, int mode) inode = ll_create_node (dir, dentry->d_name.name, dentry->d_name.len, NULL, 0, - S_IFDIR | mode, 0); + S_IFDIR | mode, 0, NULL); err = PTR_ERR(inode); if (IS_ERR(inode)) goto out_dir; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 9f09ba3..2fe1f3ea 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -33,59 +33,6 @@ #include #include -inline struct obdo * ll_oa_from_inode(struct inode *inode, unsigned long valid) -{ - struct ll_inode_info *oinfo = ll_i2info(inode); - struct obdo *oa = obdo_alloc(); - if ( !oa ) { - CERROR("no memory to allocate obdo!\n"); - return NULL; - } - oa->o_valid = valid; - - if ( valid & OBD_MD_FLID ) - oa->o_id = oinfo->lli_objid; - if ( valid & OBD_MD_FLATIME ) - oa->o_atime = inode->i_atime; - if ( valid & OBD_MD_FLMTIME ) - oa->o_mtime = inode->i_mtime; - if ( valid & OBD_MD_FLCTIME ) - oa->o_ctime = inode->i_ctime; - if ( valid & OBD_MD_FLSIZE ) - oa->o_size = inode->i_size; - if ( valid & OBD_MD_FLBLOCKS ) /* allocation of space */ - oa->o_blocks = inode->i_blocks; - if ( valid & OBD_MD_FLBLKSZ ) - oa->o_blksize = inode->i_blksize; - if ( valid & OBD_MD_FLMODE ) - oa->o_mode = inode->i_mode; - if ( valid & OBD_MD_FLUID ) - oa->o_uid = inode->i_uid; - if ( valid & OBD_MD_FLGID ) - oa->o_gid = inode->i_gid; - if ( valid & OBD_MD_FLFLAGS ) - oa->o_flags = inode->i_flags; - if ( valid & OBD_MD_FLNLINK ) - oa->o_nlink = inode->i_nlink; - if ( valid & OBD_MD_FLGENER ) - oa->o_generation = inode->i_generation; - - CDEBUG(D_INFO, "src inode %ld, dst obdo %ld valid 0x%08lx\n", - inode->i_ino, (long)oa->o_id, valid); -#if 0 - /* this will transfer metadata for the logical object to - the oa: that metadata could contain the constituent objects - */ - if (ll_has_inline(inode)) { - CDEBUG(D_INODE, "copying inline data from inode to obdo\n"); - memcpy(oa->o_inline, oinfo->lli_inline, OBD_INLINESZ); - oa->o_obdflags |= OBD_FL_INLINEDATA; - oa->o_valid |= OBD_MD_FLINLINE; - } -#endif - return oa; -} /* ll_oa_from_inode */ - /* SYNCHRONOUS I/O to object storage for an inode */ static int ll_brw(int rw, struct inode *inode, struct page *page, int create) { @@ -98,14 +45,9 @@ static int ll_brw(int rw, struct inode *inode, struct page *page, int create) int err; ENTRY; - oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD); - if (!oa) - RETURN(-ENOMEM); - + oa = ll_i2info(inode)->lli_obdo; err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo, - &page, &count, &offset, &flags); - - obdo_free(oa); + &page, &count, &offset, &flags, NULL); RETURN(err); } /* ll_brw */ @@ -217,9 +159,7 @@ static int ll_commit_write(struct file *file, struct page *page, struct iattr iattr; ENTRY; - oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD); - if (! oa ) - RETURN(-ENOMEM); + oa = ll_i2info(inode)->lli_obdo; SetPageUptodate(page); @@ -230,7 +170,7 @@ static int ll_commit_write(struct file *file, struct page *page, from, to, (unsigned long long)count); err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), num_obdo, &oa, - &bufs_per_obdo, &page, &count, &offset, &flags); + &bufs_per_obdo, &page, &count, &offset, &flags, NULL); kunmap(page); if ((iattr.ia_size = offset + to) > inode->i_size) { @@ -246,7 +186,6 @@ static int ll_commit_write(struct file *file, struct page *page, #endif } - obdo_free(oa); RETURN(err); } /* ll_commit_write */ @@ -256,16 +195,11 @@ void ll_truncate(struct inode *inode) int err; ENTRY; - oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD); - if ( !oa ) { - CERROR("no memory to allocate obdo!\n"); - return; - } + oa = ll_i2info(inode)->lli_obdo; CDEBUG(D_INFO, "calling punch for %ld (%Lu bytes at 0)\n", (long)oa->o_id, (unsigned long long)oa->o_size); err = obd_punch(ll_i2obdconn(inode), oa, oa->o_size, 0); - obdo_free(oa); if (err) { CERROR("obd_truncate fails (%d)\n", err); @@ -308,20 +242,21 @@ int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, flags[i] = OBD_BRW_CREATE; } - oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD); + oa = ll_i2info(inode)->lli_obdo; if (!oa) GOTO(out, rc = -ENOMEM); - rc = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo, - iobuf->maplist, count, offset, flags); - if (rc == 0) + iobuf->maplist, count, offset, flags, NULL); + if (rc == 0) rc = bufs_per_obdo * PAGE_SIZE; out: - obdo_free(oa); - OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo); - OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo); - OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo); + if (flags) + OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo); + if (count) + OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo); + if (offset) + OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo); RETURN(rc); } diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 1634e4f..9a8c805 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -88,6 +88,7 @@ static struct super_block * ll_read_super(struct super_block *sb, __u64 last_committed, last_rcvd; __u32 last_xid; struct ptlrpc_request *request = NULL; + struct ll_inode_md md; ENTRY; MOD_INC_USE_COUNT; @@ -185,8 +186,10 @@ static struct super_block * ll_read_super(struct super_block *sb, GOTO(out_mdc, sb = NULL); } - root = iget4(sb, sbi->ll_rootino, NULL, - lustre_msg_buf(request->rq_repmsg, 0)); + md.body = lustre_msg_buf(request->rq_repmsg, 0); + md.obdo = NULL; + root = iget4(sb, sbi->ll_rootino, NULL, &md); + if (root) { sb->s_root = d_alloc_root(root); } else { @@ -237,14 +240,28 @@ static void ll_put_super(struct super_block *sb) EXIT; } /* ll_put_super */ - -extern inline struct obdo * ll_oa_from_inode(struct inode *inode, int valid); +static void ll_clear_inode(struct inode *inode) +{ + if (atomic_read(&inode->i_count) == 0) { + struct obdo *oa = ll_i2info(inode)->lli_obdo; + if (oa) { + obdo_free(oa); + ll_i2info(inode)->lli_obdo = NULL; + } + if (ll_i2info(inode)->lli_symlink_name) { + OBD_FREE(ll_i2info(inode)->lli_symlink_name, + strlen(ll_i2info(inode)->lli_symlink_name)+ 1); + ll_i2info(inode)->lli_symlink_name = NULL; + } + } +} static void ll_delete_inode(struct inode *inode) { - if (S_ISREG(inode->i_mode)) { - int err; - struct obdo *oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD); + if (S_ISREG(inode->i_mode)) { + int err; + struct obdo *oa; + oa = ll_i2info(inode)->lli_obdo; if (!oa) { CERROR("no memory\n"); @@ -349,8 +366,9 @@ static int ll_statfs(struct super_block *sb, struct statfs *buf) RETURN(err); } -static void inline ll_to_inode(struct inode *dst, struct mds_body *body) +static void inline ll_to_inode(struct inode *dst, struct ll_inode_md *md) { + struct mds_body *body = md->body; struct ll_inode_info *ii = ll_i2info(dst); /* core attributes first */ @@ -378,8 +396,10 @@ static void inline ll_to_inode(struct inode *dst, struct mds_body *body) dst->i_generation = body->generation; /* this will become more elaborate for striping etc */ - if (body->valid & OBD_MD_FLOBJID) - ii->lli_objid = body->objid; + if (md->obdo != NULL) { + ii->lli_obdo = obdo_alloc(); + memcpy(ii->lli_obdo, md->obdo, sizeof(*md->obdo)); + } #if 0 if (obdo_has_inline(oa)) { @@ -400,10 +420,10 @@ static void inline ll_to_inode(struct inode *dst, struct mds_body *body) static inline void ll_read_inode2(struct inode *inode, void *opaque) { - struct mds_body *body = opaque; + struct ll_inode_md *md = opaque; ENTRY; - ll_to_inode(inode, body); + ll_to_inode(inode, md); /* OIDEBUG(inode); */ @@ -433,6 +453,7 @@ static inline void ll_read_inode2(struct inode *inode, void *opaque) struct super_operations ll_super_operations = { read_inode2: ll_read_inode2, + clear_inode: ll_clear_inode, delete_inode: ll_delete_inode, put_super: ll_put_super, statfs: ll_statfs diff --git a/lustre/lov/.cvsignore b/lustre/lov/.cvsignore new file mode 100644 index 0000000..282522d --- /dev/null +++ b/lustre/lov/.cvsignore @@ -0,0 +1,2 @@ +Makefile +Makefile.in diff --git a/lustre/lov/Makefile.am b/lustre/lov/Makefile.am index 8351f5b..90d38d4 100644 --- a/lustre/lov/Makefile.am +++ b/lustre/lov/Makefile.am @@ -5,10 +5,10 @@ DEFS := -#MODULE = lov -#modulefs_DATA = lov.o -#EXTRA_PROGRAMS = lov +MODULE = lov +modulefs_DATA = lov.o +EXTRA_PROGRAMS = lov -#lov_SOURCES = lov_obd.c +lov_SOURCES = lov_obd.c include $(top_srcdir)/Rules diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index e74d0fe..bb5df34 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -15,6 +15,7 @@ #include #include +#include extern struct obd_device obd_dev[MAX_OBD_DEVICES]; @@ -115,65 +116,70 @@ static int lov_close(struct obd_conn *conn, struct obdo *oa) static int lov_create(struct obd_conn *conn, struct obdo *oa) { - int rc, i; + int rc, retval, i, offset; + struct obdo tmp; + struct lov_md md; ENTRY; if (!gen_client(conn)) RETURN(-EINVAL); - for (i = 0; i < conn->oc_dev->obd_multi_count; i++) - rc = obd_create(&conn->oc_dev->obd_multi_conn[i], oa); + md.lmd_object_id = oa->o_id; + md.lmd_stripe_count = conn->oc_dev->obd_multi_count; + + memset(oa->o_inline, 0, sizeof(oa->o_inline)); + offset = sizeof(md); + for (i = 0; i < md.lmd_stripe_count; i++) { + struct lov_object_id lov_id; + rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp); + if (i == 0) { + memcpy(oa, &tmp, sizeof(tmp)); + retval = rc; + } else if (retval != rc) + CERROR("return codes didn't match (%d, %d)\n", + retval, rc); + lov_id = (struct lov_object_id *)(oa->o_inline + offset); + lov_id->l_device_id = i; + lov_id->l_object_id = tmp.o_id; + offset += sizeof(*lov_id); + } + memcpy(oa->o_inline, &md, sizeof(md)); return rc; } -static int filter_destroy(struct obd_conn *conn, struct obdo *oa) +static int lov_destroy(struct obd_conn *conn, struct obdo *oa) { -#if 0 - struct obd_device * obddev; - struct obd_client * cli; - struct inode * inode; - struct file *dir; - struct file *object; - int rc; - struct obd_run_ctxt saved; + int rc, retval, i, offset; + struct obdo tmp; + struct lov_md *md; + struct lov_object_id lov_id; + ENTRY; - if (!(cli = gen_client(conn))) { - CERROR("invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; - } + if (!gen_client(conn)) + RETURN(-EINVAL); - obddev = conn->oc_dev; - object = filter_obj_open(obddev, oa->o_id, oa->o_mode); - if (!object || IS_ERR(object)) { - EXIT; - return -ENOENT; - } + md = (struct lov_md *)oa->o_inline; - inode = object->f_dentry->d_inode; - inode->i_nlink = 1; - inode->i_mode = 010000; + memcpy(&tmp, oa, sizeof(tmp)); - push_ctxt(&saved, &obddev->u.filter.fo_ctxt); - dir = filter_parent(oa->o_id, oa->o_mode); - if (IS_ERR(dir)) { - rc = PTR_ERR(dir); - EXIT; - goto out; + offset = sizeof(md); + for (i = 0; i < md->lmd_stripe_count; i++) { + struct lov_object_id *lov_id; + lov_id = (struct lov_object_id *)(oa->o_inline + offset); + + tmp.o_id = lov_id->l_object_id; + + rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp); + if (i == 0) + retval = rc; + else if (retval != rc) + CERROR("return codes didn't match (%d, %d)\n", + retval, rc); + offset += sizeof(*lov_id); } - dget(dir->f_dentry); - dget(object->f_dentry); - rc = vfs_unlink(dir->f_dentry->d_inode, object->f_dentry); - - filp_close(dir, 0); - filp_close(object, 0); -out: - pop_ctxt(&saved); - EXIT; + return rc; -#endif - return 0; } /* FIXME: maybe we'll just make one node the authoritative attribute node, then @@ -200,18 +206,74 @@ static int lov_punch(struct obd_conn *conn, struct obdo *oa, RETURN(rc); } +struct lov_callback_data { + atomic_t count; + wait_queue_head waitq; +}; + +static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data) +{ + struct lov_callback_data *cb_data = data; + + if (atomic_dec_and_test(&cb_data->count)) + wake_up(&cb_data->waitq); +} + +static int lov_read_check_status(struct lov_callback_data *cb_data) +{ + ENTRY; + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGTERM) || + sigismember(&(current->pending.signal), SIGINT)) { + cb_data->flags |= PTL_RPC_FL_INTR; + RETURN(1); + } + if (atomic_read(&cb_data->count) == 0) + RETURN(1); + RETURN(0); +} + /* buffer must lie in user memory here */ -static int lov_read(struct obd_conn *conn, struct obdo *oa, char *buf, - obd_size *count, obd_off offset) +static int lov_brw(int cmd, struct obd_conn *conn, obd_count num_oa, + struct obdo **oa, + obd_count *oa_bufs, struct page **buf, + obd_size *count, obd_off *offset, obd_flag *flags, + bulk_callback_t callback, void *data) { - int rc, i; + int rc, i, page_array_offset = 0; obd_off off = offset; obd_size retval = 0; + struct lov_callback_data *cb_data; ENTRY; + if (num_oa != 1) + LBUG(); + if (!gen_client(conn)) RETURN(-EINVAL); + OBD_ALLOC(cb_data, sizeof(*cb_data)); + if (cb_data == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + INIT_WAITQUEUE_HEAD(&cb_data->waitq); + atomic_set(&cb_data->count, 0); + + for (i = 0; i < oa_bufs[0]; i++) { + struct page *current_page = buf[i]; + + struct lov_md *md = (struct lov_md *)oa[i]->inline; + int bufcount = oa_bufs[i]; + // md->lmd_stripe_count + + for (k = page_array_offset; k < bufcount + page_array_offset; + k++) { + + } + page_array_offset += bufcount; + + while (off < offset + count) { int stripe, conn; obd_size size, tmp; @@ -221,12 +283,13 @@ static int lov_read(struct obd_conn *conn, struct obdo *oa, char *buf, if (size > *count) size = *count; - conn = stripe % conn->oc_dev->obd_multi_count; tmp = size; - rc = obd_read(&conn->oc_dev->obd_multi_conn[conn], oa, buf, - &size, off); + atomic_inc(&cb_data->count); + rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn], + num_oa, oa, buf, + &size, off, lov_read_callback, cb_data); if (rc == 0) retval += size; else { @@ -239,17 +302,28 @@ static int lov_read(struct obd_conn *conn, struct obdo *oa, char *buf, buf += size; } + wait_event_interruptible(&cb_data->waitq, + lov_read_check_status(cb_data)); + if (cb_data->flags & PTL_RPC_FL_INTR) + rc = -EINTR; + + /* FIXME: The error handling here sucks */ *count = retval; + OBD_FREE(cb_data, sizeof(*cb_data)); RETURN(rc); } +static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data) +{ + +} /* buffer must lie in user memory here */ static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf, obd_size *count, obd_off offset) { int err; - struct file * file; + struct file *file; unsigned long retval; ENTRY; @@ -312,22 +386,25 @@ static int lov_cancel(struct obd_conn *conn, __u32 mode, rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa); RETURN(rc); } +#endif struct obd_ops lov_obd_ops = { o_setup: gen_multi_setup, o_cleanup: gen_multi_cleanup, o_create: lov_create, - o_destroy: lov_destroy, + o_connect: lov_connect, + o_disconnect: lov_disconnect, o_getattr: lov_getattr, o_setattr: lov_setattr, o_open: lov_open, o_close: lov_close, - o_connect: lov_connect, - o_disconnect: lov_disconnect, +#if 0 + o_destroy: lov_destroy, o_brw: lov_pgcache_brw, o_punch: lov_punch, o_enqueue: lov_enqueue, o_cancel: lov_cancel +#endif }; diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 2f59c2e..049c409 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -74,27 +74,40 @@ int mdc_setattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct inode *dir, const char *name, int namelen, - const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid, - __u32 gid, __u64 time, struct ptlrpc_request **request) + const char *tgt, int tgtlen, int mode, __u32 uid, + __u32 gid, __u64 time, __u64 rdev, struct obdo *obdo, + struct ptlrpc_request **request) { struct mds_rec_create *rec; struct ptlrpc_request *req; - int rc, size[3] = {sizeof(*rec), namelen + 1, tgtlen + 1}; - char *tmp; - int level; + int rc, size[3] = {sizeof(*rec), namelen + 1, 0}; + char *tmp, *bufs[3] = {NULL, NULL, NULL}; + int level, bufcount = 2; ENTRY; - req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL); + if (S_ISREG(mode)) { + size[2] = sizeof(*obdo); + bufs[2] = (char *)obdo; + bufcount = 3; + } else if (S_ISLNK(mode)) { + size[2] = tgtlen + 1; + bufcount = 3; + } + + req = ptlrpc_prep_req(cl, conn, MDS_REINT, bufcount, size, bufs); if (!req) RETURN(-ENOMEM); rec = lustre_msg_buf(req->rq_reqmsg, 0); - mds_create_pack(rec, dir, mode, id, uid, gid, time); + mds_create_pack(rec, dir, mode, rdev, uid, gid, time); tmp = lustre_msg_buf(req->rq_reqmsg, 1); LOGL0(name, namelen, tmp); - if (tgt) { + if (S_ISREG(mode)) { + tmp = lustre_msg_buf(req->rq_reqmsg, 2); + memcpy(tmp, obdo, sizeof(*obdo)); + } else if (S_ISLNK(mode)) { tmp = lustre_msg_buf(req->rq_reqmsg, 2); LOGL0(tgt, tgtlen, tmp); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 3a7fc63..a928fc3 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -95,7 +95,10 @@ int mdc_getattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, ll_ino2fid(&body->fid1, ino, 0, type); body->valid = valid; - if (valid & OBD_MD_LINKNAME) { + if (S_ISREG(type)) { + bufcount = 2; + size[1] = sizeof(struct obdo); + } else if (valid & OBD_MD_LINKNAME) { bufcount = 2; size[1] = ea_size; } @@ -134,7 +137,7 @@ int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, body = lustre_msg_buf(req->rq_reqmsg, 0); ll_ino2fid(&body->fid1, ino, 0, type); body->flags = HTON__u32(flags); - body->objid = cookie; + body->extra = cookie; req->rq_replen = lustre_msg_size(1, &size); @@ -144,7 +147,7 @@ int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (!rc) { mds_unpack_rep_body(req); body = lustre_msg_buf(req->rq_repmsg, 0); - *fh = body->objid; + *fh = body->extra; } EXIT; @@ -166,7 +169,7 @@ int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, body = lustre_msg_buf(req->rq_reqmsg, 0); ll_ino2fid(&body->fid1, ino, 0, type); - body->objid = fh; + body->extra = fh; req->rq_level = LUSTRE_CONN_FULL; req->rq_replen = lustre_msg_size(0, NULL); @@ -347,7 +350,7 @@ static int request_ioctl(struct inode *inode, struct file *file, err = mdc_create(&cl, conn, &inode, "foofile", strlen("foofile"), NULL, 0, 0100707, 47114711, - 11, 47, 0, &request); + 11, 47, 0, NULL, &request); CERROR("-- done err %d\n", err); GOTO(out, err); diff --git a/lustre/mds/Makefile.am b/lustre/mds/Makefile.am index 8c94e54..887039c 100644 --- a/lustre/mds/Makefile.am +++ b/lustre/mds/Makefile.am @@ -6,8 +6,8 @@ DEFS:= MODULE = mds -modulefs_DATA = mds.o mds_ext2.o mds_ext3.o mds_extN.o -EXTRA_PROGRAMS = mds mds_ext2 mds_ext3 mds_extN +modulefs_DATA = mds.o mds_extN.o # mds_ext2.o mds_ext3.o +EXTRA_PROGRAMS = mds mds_extN # mds_ext2 mds_ext3 LINX=mds_updates.c simple.c mds_updates.c: diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 6f54693..ab4f48a 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -22,8 +22,8 @@ #include #include -static -int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset) +static int mds_sendpage(struct ptlrpc_request *req, struct file *file, + __u64 offset) { int rc = 0; mm_segment_t oldfs = get_fs(); @@ -58,6 +58,9 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset) desc->b_portal = MDS_BULK_PORTAL; rc = ptlrpc_send_bulk(desc); + if (rc) + GOTO(cleanup_buf, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { CERROR("obd_fail_loc=%x, fail operation rc=%d\n", OBD_FAIL_MDS_SENDPAGE, rc); @@ -65,6 +68,10 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset) GOTO(cleanup_buf, rc); } + wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); + if (desc->b_flags & PTL_RPC_FL_INTR) + GOTO(cleanup_buf, rc = -EINTR); + EXIT; cleanup_buf: OBD_FREE(buf, PAGE_SIZE); @@ -193,14 +200,13 @@ int mds_connect(struct ptlrpc_request *req) RETURN(0); } -static -int mds_getattr(struct ptlrpc_request *req) +static int mds_getattr(struct ptlrpc_request *req) { struct dentry *de; struct inode *inode; struct mds_body *body; struct mds_obd *mds = &req->rq_obd->u.mds; - int rc, size[2] = {sizeof(*body)}, count = 1; + int rc, size[2] = {sizeof(*body)}, bufcount = 1; ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0); @@ -209,13 +215,17 @@ int mds_getattr(struct ptlrpc_request *req) req->rq_status = -ENOENT; RETURN(0); } + inode = de->d_inode; - if (body->valid & OBD_MD_LINKNAME) { - count = 2; + if (S_ISREG(body->fid1.f_type)) { + bufcount = 2; + size[1] = sizeof(struct obdo); + } else if (body->valid & OBD_MD_LINKNAME) { + bufcount = 2; size[1] = inode->i_size; } - rc = lustre_pack_msg(count, size, NULL, &req->rq_replen, + rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { CERROR("mds: out of memory\n"); @@ -250,15 +260,17 @@ int mds_getattr(struct ptlrpc_request *req) body->size = inode->i_size; body->mode = inode->i_mode; body->nlink = inode->i_nlink; + body->valid = ~0; /* FIXME: should be more selective */ + if (S_ISREG(inode->i_mode)) { - rc = mds_fs_get_objid(mds, inode, &body->objid); + rc = mds_fs_get_obdo(mds, inode, + lustre_msg_buf(req->rq_repmsg, 1)); if (rc < 0) { req->rq_status = rc; - CERROR("readlink failed: %d\n", rc); + CERROR("mds_fs_get_objid failed: %d\n", rc); GOTO(out, 0); } } - body->valid = ~0; /* FIXME: should be more selective */ out: l_dput(de); RETURN(0); @@ -301,7 +313,7 @@ int mds_open(struct ptlrpc_request *req) list_for_each(tmp, &mci->mci_open_head) { struct mds_file_data *fd; fd = list_entry(tmp, struct mds_file_data, mfd_list); - if (body->objid == fd->mfd_clientfd && + if (body->extra == fd->mfd_clientfd && body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) { CERROR("Re opening %Ld\n", body->fid1.id); RETURN(0); @@ -331,11 +343,11 @@ int mds_open(struct ptlrpc_request *req) file->private_data = mfd; mfd->mfd_file = file; - mfd->mfd_clientfd = body->objid; + mfd->mfd_clientfd = body->extra; list_add(&mfd->mfd_list, &mci->mci_open_head); body = lustre_msg_buf(req->rq_repmsg, 0); - body->objid = (__u64) (unsigned long)file; + body->extra = (__u64) (unsigned long)file; RETURN(0); } @@ -364,7 +376,7 @@ int mds_close(struct ptlrpc_request *req) RETURN(0); } - file = (struct file *)(unsigned long)body->objid; + file = (struct file *)(unsigned long)body->extra; if (!file->f_dentry) LBUG(); mfd = (struct mds_file_data *)file->private_data; diff --git a/lustre/mds/mds_extN.c b/lustre/mds/mds_extN.c index 714943d..be7587e 100644 --- a/lustre/mds/mds_extN.c +++ b/lustre/mds/mds_extN.c @@ -22,6 +22,7 @@ #include #include #include +#include static struct mds_fs_operations mds_extN_fs_ops; static kmem_cache_t *jcb_cache; @@ -34,10 +35,8 @@ struct mds_cb_data { }; struct mds_objid { - __u16 mo_magic; - __u8 mo_unused; - __u8 mo_ost; - __u64 mo_id; + __u64 mo_magic; + struct lov_md mo_lov_md; }; #define EXTN_XATTR_INDEX_LUSTRE 5 @@ -103,65 +102,75 @@ static int mds_extN_setattr(struct dentry *dentry, void *handle, return inode_setattr(inode, iattr); } -static int mds_extN_set_objid(struct inode *inode, void *handle, obd_id id) +static int mds_extN_set_obdo(struct inode *inode, void *handle, + struct obdo *obdo) { - struct mds_objid data; + struct mds_objid *data = (struct mds_objid *)obdo->o_inline; int rc; - data.mo_magic = cpu_to_le16(XATTR_MDS_MO_MAGIC); - data.mo_unused = 0; - data.mo_ost = 0; /* FIXME: store OST index here */ - data.mo_id = cpu_to_le64(id); + data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC); lock_kernel(); down(&inode->i_sem); - if (id == 0) { + if (obdo == NULL) rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0); - } else { + else rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, - XATTR_LUSTRE_MDS_OBJID, &data, - sizeof(struct mds_objid), XATTR_CREATE); - } + XATTR_LUSTRE_MDS_OBJID, obdo->o_inline, + OBD_INLINESZ, XATTR_CREATE); up(&inode->i_sem); unlock_kernel(); if (rc) CERROR("error adding objectid %Ld to inode %ld\n", - (unsigned long long)id, inode->i_ino); + (unsigned long long)obdo->o_id, inode->i_ino); return rc; } -static int mds_extN_get_objid(struct inode *inode, obd_id *id) +static int mds_extN_get_obdo(struct inode *inode, struct obdo *obdo) { + char *buf; struct mds_objid data; - int rc; + struct lov_object_id *lov_ids; + int rc, size; lock_kernel(); down(&inode->i_sem); rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE, XATTR_LUSTRE_MDS_OBJID, &data, sizeof(struct mds_objid)); + size = sizeof(struct mds_objid) + data.mo_lov_md.lmd_stripe_count * + sizeof(*lov_ids); + OBD_ALLOC(buf, size); + if (buf == NULL) + LBUG(); + rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE, + XATTR_LUSTRE_MDS_OBJID, buf, size); up(&inode->i_sem); unlock_kernel(); + if (size > OBD_INLINESZ) + LBUG(); + if (rc < 0) { CERROR("error getting EA %s from MDS inode %ld: rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc); - *id = 0; - } else if (data.mo_magic != cpu_to_le16(XATTR_MDS_MO_MAGIC)) { - CERROR("MDS object id %Ld has bad magic %x\n", - (unsigned long long)le64_to_cpu(data.mo_id), - le16_to_cpu(data.mo_magic)); + obdo->o_id = 0; + } else if (data.mo_magic != cpu_to_le64(XATTR_MDS_MO_MAGIC)) { + CERROR("MDS object id %Ld has bad magic %Lx\n", + (unsigned long long)obdo->o_id, + (unsigned long long)le64_to_cpu(data.mo_magic)); rc = -EINVAL; } else { - *id = le64_to_cpu(data.mo_id); - /* FIXME: will actually use data.mo_ost at some point */ - if (data.mo_ost) - CERROR("MDS objid %Ld with ost index %d!\n", - *id, data.mo_ost); + /* This field is byteswapped because it appears in the + * catalogue. All others are opaque to the MDS */ + obdo->o_id = le64_to_cpu(data.mo_lov_md.lmd_object_id); + memcpy(obdo->o_inline, buf + sizeof(data), size); } +#warning FIXME: pass this buffer to caller for transmission when size exceeds OBD_INLINESZ + OBD_FREE(buf, size); return rc; } @@ -202,8 +211,8 @@ static void mds_extN_delete_inode(struct inode *inode) EXIT; return; } - if (mds_extN_set_objid(inode, handle, 0)) - CERROR("error clearing objid on %ld\n", inode->i_ino); + if (mds_extN_set_obdo(inode, handle, NULL)) + CERROR("error clearing obdo on %ld\n", inode->i_ino); if (mds_extN_fs_ops.cl_delete_inode) mds_extN_fs_ops.cl_delete_inode(inode); @@ -286,8 +295,8 @@ static struct mds_fs_operations mds_extN_fs_ops = { fs_start: mds_extN_start, fs_commit: mds_extN_commit, fs_setattr: mds_extN_setattr, - fs_set_objid: mds_extN_set_objid, - fs_get_objid: mds_extN_get_objid, + fs_set_obdo: mds_extN_set_obdo, + fs_get_obdo: mds_extN_get_obdo, fs_readpage: mds_extN_readpage, fs_delete_inode: mds_extN_delete_inode, cl_delete_inode: clear_inode, diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index d8edcb0..f0e7216 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -16,16 +16,6 @@ // XXX - add transaction sequence numbers #define EXPORT_SYMTAB - -#include -#include -#include -#include -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_MDS #include @@ -261,7 +251,7 @@ static int mds_reint_create(struct mds_update_record *rec, case S_IFBLK: case S_IFIFO: case S_IFSOCK: { - int rdev = rec->ur_id; + int rdev = rec->ur_rdev; handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD); if (!handle) GOTO(out_create_dchild, PTR_ERR(handle)); @@ -285,7 +275,9 @@ static int mds_reint_create(struct mds_update_record *rec, CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino); if (type == S_IFREG) { - rc = mds_fs_set_objid(mds, inode, handle, rec->ur_id); + struct obdo *obdo; + obdo = lustre_msg_buf(req->rq_reqmsg, 2); + rc = mds_fs_set_obdo(mds, inode, handle, obdo); if (rc) CERROR("error %d setting objid for %ld\n", rc, inode->i_ino); @@ -526,7 +518,8 @@ static int mds_reint_rename(struct mds_update_record *rec, handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME); if (!handle) GOTO(out_rename_denew, rc = PTR_ERR(handle)); - rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new); + rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new, + NULL); if (!rc) rc = mds_update_last_rcvd(mds, handle, req); diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 2c215ad..178e74a 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -511,7 +511,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, } err = obd_brw(rw, conns, num, obdos, oa_bufs, bufs, - counts, offsets, flags); + counts, offsets, flags, NULL); EXIT; brw_cleanup: diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 35ee6b8..87122ab 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -238,7 +238,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, page->index = index; err = OBP(src_conn->oc_dev, brw)(READ, src_conn, num_oa, &src, &num_buf, &page, &brw_count, - &brw_offset, &flagr); + &brw_offset, &flagr, NULL); if ( err ) { EXIT; @@ -248,7 +248,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, err = OBP(dst_conn->oc_dev, brw)(WRITE, dst_conn, num_oa, &dst, &num_buf, &page, &brw_count, - &brw_offset, &flagw); + &brw_offset, &flagw, NULL); /* XXX should handle dst->o_size, dst->o_blocks here */ if ( err ) { diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 0aa5381..863f96b 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -690,6 +690,7 @@ static int filter_destroy(struct obd_conn *conn, struct obdo *oa) inode->i_mode = S_IFREG; push_ctxt(&saved, &obddev->u.filter.fo_ctxt); + rc = vfs_unlink(dir_dentry->d_inode, object_dentry); pop_ctxt(&saved); CDEBUG(D_INODE, "put child %p, count = %d\n", object_dentry, @@ -792,7 +793,7 @@ static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf, static int filter_pgcache_brw(int rw, struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **pages, obd_size *count, - obd_off *offset, obd_flag *flags) + obd_off *offset, obd_flag *flags, void *callback) { struct obd_run_ctxt saved; struct super_block *sb; diff --git a/lustre/obdfs/rw.c b/lustre/obdfs/rw.c index ba43795..34f65e4 100644 --- a/lustre/obdfs/rw.c +++ b/lustre/obdfs/rw.c @@ -169,7 +169,7 @@ static int obdfs_brw(int rw, struct inode *inode, struct page *page, int create) obdfs_from_inode(oa, inode); err = obd_brw(rw, IID(inode), num_obdo, &oa, &bufs_per_obdo, - &page, &count, &offset, &flags); + &page, &count, &offset, &flags, NULL); //if ( !err ) // obdfs_to_inode(inode, oa); /* copy o_blocks to i_blocks */ @@ -205,7 +205,7 @@ static int obdfs_commit_page(struct page *page, int create, int from, int to) from, to, (unsigned long long)count); err = obd_brw(WRITE, IID(inode), num_obdo, &oa, &bufs_per_obdo, - &page, &count, &offset, &flags); + &page, &count, &offset, &flags, NULL); if ( !err ) { SetPageUptodate(page); set_page_clean(page); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index fe83d9b..444ff05 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -82,7 +82,8 @@ static int osc_disconnect(struct obd_conn *conn) ENTRY; osc_con2cl(conn, &cl, &connection); - request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL); + request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, + NULL); if (!request) RETURN(-ENOMEM); @@ -360,26 +361,32 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) return 0; } -static int osc_sendpage(struct ptlrpc_bulk_desc *desc, - struct niobuf_remote *dst, struct niobuf_local *src) +struct osc_brw_cb_data { + struct page **buf; + struct ptlrpc_request *req; + bulk_callback_t callback; + void *cb_data; +}; + +static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data) { - struct ptlrpc_bulk_page *page; - ENTRY; + struct osc_brw_cb_data *cb_data = data; - page = ptlrpc_prep_bulk_page(desc); - if (page == NULL) - RETURN(-ENOMEM); + if (desc->b_flags & PTL_RPC_FL_INTR) + CERROR("got signal\n"); - page->b_buf = (void *)(unsigned long)src->addr; - page->b_buflen = src->len; - page->b_xid = dst->xid; + (cb_data->callback)(desc, cb_data->cb_data); - RETURN(0); + ptlrpc_free_bulk(desc); + ptlrpc_free_req(cb_data->req); + + OBD_FREE(cb_data, sizeof(*cb_data)); } static int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, - obd_size *count, obd_off *offset, obd_flag *flags) + obd_size *count, obd_off *offset, obd_flag *flags, + bulk_callback_t callback) { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; @@ -414,6 +421,7 @@ static int osc_brw_read(struct obd_conn *conn, obd_count num_oa, ptr2 = lustre_msg_buf(request->rq_reqmsg, 2); for (pages = 0, i = 0; i < num_oa; i++) { ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); + /* FIXME: this inner loop is wrong for multiple OAs */ for (j = 0; j < oa_bufs[i]; j++, pages++) { struct ptlrpc_bulk_page *bulk; bulk = ptlrpc_prep_bulk_page(desc); @@ -457,10 +465,30 @@ static int osc_brw_read(struct obd_conn *conn, obd_count num_oa, return rc; } +static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data) +{ + struct osc_brw_cb_data *cb_data = data; + int i; + + if (desc->b_flags & PTL_RPC_FL_INTR) + CERROR("got signal\n"); + + for (i = 0; i < desc->b_page_count; i++) + kunmap(cb_data->buf[i]); + + (cb_data->callback)(desc, cb_data->cb_data); + + ptlrpc_free_bulk(desc); + ptlrpc_free_req(cb_data->req); + + OBD_FREE(cb_data, sizeof(*cb_data)); +} + static int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, - struct page **pagearray, obd_size *count, obd_off *offset, - obd_flag *flags) + struct page **pagearray, obd_size *count, + obd_off *offset, obd_flag *flags, + bulk_callback_t callback) { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; @@ -470,6 +498,7 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct ost_body *body; struct niobuf_local *local; struct niobuf_remote *remote; + struct osc_brw_cb_data *cb_data; long pages; int rc, i, j, size[3] = {sizeof(*body)}; void *ptr1, *ptr2; @@ -527,17 +556,44 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa, desc = ptlrpc_prep_bulk(connection); desc->b_portal = OSC_BULK_PORTAL; + if (callback) { + desc->b_cb = brw_write_finish; + OBD_ALLOC(cb_data, sizeof(*cb_data)); + cb_data->buf = pagearray; + cb_data->callback = callback; + desc->b_cb_data = cb_data; + } for (pages = 0, i = 0; i < num_oa; i++) { for (j = 0; j < oa_bufs[i]; j++, pages++) { + struct ptlrpc_bulk_page *page; + ost_unpack_niobuf(&ptr2, &remote); - rc = osc_sendpage(desc, remote, &local[pages]); - if (rc) - GOTO(out3, rc); + + page = ptlrpc_prep_bulk_page(desc); + if (page == NULL) + GOTO(out3, rc = -ENOMEM); + + page->b_buf = (void *)(unsigned long)local[pages].addr; + page->b_buflen = local[pages].len; + page->b_xid = remote->xid; } } + if (desc->b_page_count != pages) + LBUG(); + rc = ptlrpc_send_bulk(desc); + if (rc) + GOTO(out3, rc); + if (callback) + GOTO(out, rc); + + /* If there's no callback function, sleep here until complete. */ + wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); + if (desc->b_flags & PTL_RPC_FL_INTR) + rc = -EINTR; + GOTO(out3, rc); out3: @@ -555,14 +611,18 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa, static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, - obd_size *count, obd_off *offset, obd_flag *flags) + obd_size *count, obd_off *offset, obd_flag *flags, + void *callback) { + if (num_oa != 1) + LBUG(); + if (rw == OBD_BRW_READ) return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count, - offset, flags); + offset, flags, (bulk_callback_t)callback); else return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count, - offset, flags); + offset, flags, (bulk_callback_t)callback); } static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 1276c8b..bb5a214 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -356,11 +356,9 @@ static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk) RETURN(rc); } -static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc) +static void ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc, void *data) { ptlrpc_free_bulk(desc); - - return 0; } static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 19ea660..e1a1726 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -32,7 +32,7 @@ static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL; /* * Free the packet when it has gone out */ -static int request_out_callback(ptl_event_t *ev, void *data) +static int request_out_callback(ptl_event_t *ev) { ENTRY; @@ -49,7 +49,7 @@ static int request_out_callback(ptl_event_t *ev, void *data) /* * Free the packet when it has gone out */ -static int reply_out_callback(ptl_event_t *ev, void *data) +static int reply_out_callback(ptl_event_t *ev) { ENTRY; @@ -67,7 +67,7 @@ static int reply_out_callback(ptl_event_t *ev, void *data) /* * Wake up the thread waiting for the reply once it comes in. */ -static int reply_in_callback(ptl_event_t *ev, void *data) +static int reply_in_callback(ptl_event_t *ev) { struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; ENTRY; @@ -85,9 +85,9 @@ static int reply_in_callback(ptl_event_t *ev, void *data) RETURN(1); } -int request_in_callback(ptl_event_t *ev, void *data) +int request_in_callback(ptl_event_t *ev) { - struct ptlrpc_service *service = data; + struct ptlrpc_service *service = ev->mem_desc.user_ptr; int index; if (ev->rlength != ev->mlength) @@ -130,7 +130,7 @@ int request_in_callback(ptl_event_t *ev, void *data) return 0; } -static int bulk_source_callback(ptl_event_t *ev, void *data) +static int bulk_source_callback(ptl_event_t *ev) { struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; struct ptlrpc_bulk_desc *desc = bulk->b_desc; @@ -140,8 +140,14 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) CDEBUG(D_NET, "got SENT event\n"); } else if (ev->type == PTL_EVENT_ACK) { CDEBUG(D_NET, "got ACK event\n"); - desc->b_flags |= PTL_BULK_FL_SENT; - wake_up_interruptible(&desc->b_waitq); + if (bulk->b_cb != NULL) + bulk->b_cb(bulk); + if (atomic_dec_and_test(&desc->b_finished_count)) { + desc->b_flags |= PTL_BULK_FL_SENT; + wake_up_interruptible(&desc->b_waitq); + if (desc->b_cb != NULL) + desc->b_cb(desc, desc->b_cb_data); + } } else { CERROR("Unexpected event type!\n"); LBUG(); @@ -150,7 +156,7 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) RETURN(1); } -static int bulk_sink_callback(ptl_event_t *ev, void *data) +static int bulk_sink_callback(ptl_event_t *ev) { struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; struct ptlrpc_bulk_desc *desc = bulk->b_desc; @@ -159,14 +165,13 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data) if (ev->type == PTL_EVENT_PUT) { if (bulk->b_buf != ev->mem_desc.start + ev->offset) CERROR("bulkbuf != mem_desc -- why?\n"); - desc->b_finished_count++; if (bulk->b_cb != NULL) bulk->b_cb(bulk); - if (desc->b_finished_count == desc->b_page_count) { + if (atomic_dec_and_test(&desc->b_finished_count)) { desc->b_flags |= PTL_BULK_FL_RCVD; wake_up_interruptible(&desc->b_waitq); if (desc->b_cb != NULL) - desc->b_cb(desc); + desc->b_cb(desc, desc->b_cb_data); } } else { CERROR("Unexpected event type!\n"); @@ -194,23 +199,23 @@ int ptlrpc_init_portals(void) else ni = *socknal_nip; - rc = PtlEQAlloc(ni, 128, request_out_callback, NULL, &request_out_eq); + rc = PtlEQAlloc(ni, 128, request_out_callback, &request_out_eq); if (rc != PTL_OK) CERROR("PtlEQAlloc failed: %d\n", rc); - rc = PtlEQAlloc(ni, 128, reply_out_callback, NULL, &reply_out_eq); + rc = PtlEQAlloc(ni, 128, reply_out_callback, &reply_out_eq); if (rc != PTL_OK) CERROR("PtlEQAlloc failed: %d\n", rc); - rc = PtlEQAlloc(ni, 128, reply_in_callback, NULL, &reply_in_eq); + rc = PtlEQAlloc(ni, 128, reply_in_callback, &reply_in_eq); if (rc != PTL_OK) CERROR("PtlEQAlloc failed: %d\n", rc); - rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq); + rc = PtlEQAlloc(ni, 128, bulk_source_callback, &bulk_source_eq); if (rc != PTL_OK) CERROR("PtlEQAlloc failed: %d\n", rc); - rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq); + rc = PtlEQAlloc(ni, 128, bulk_sink_callback, &bulk_sink_eq); if (rc != PTL_OK) CERROR("PtlEQAlloc failed: %d\n", rc); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 584cbf9..1157920 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -28,7 +28,7 @@ extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq, bulk_sink_eq; static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY}; -static int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) +int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) { ENTRY; @@ -109,6 +109,8 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc) ptl_process_id_t remote_id; ENTRY; + atomic_set(&desc->b_finished_count, desc->b_page_count); + list_for_each_safe(tmp, next, &desc->b_page_list) { /* only request an ACK for the last page */ int ack = (next == &desc->b_page_list); @@ -139,19 +141,14 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc) rc = PtlPut(bulk->b_md_h, (ack ? PTL_ACK_REQ : PTL_NOACK_REQ), remote_id, desc->b_portal, 0, bulk->b_xid, 0, 0); if (rc != PTL_OK) { - CERROR("PtlPut(%Lu, %d, %d) failed: %d\n", remote_id.nid, - desc->b_portal, bulk->b_xid, rc); + CERROR("PtlPut(%Lu, %d, %d) failed: %d\n", + remote_id.nid, desc->b_portal, bulk->b_xid, rc); PtlMDUnlink(bulk->b_md_h); LBUG(); RETURN(rc); } } - wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); - - if (desc->b_flags & PTL_RPC_FL_INTR) - RETURN(-EINTR); - RETURN(0); } @@ -254,7 +251,6 @@ int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req) int ptl_send_rpc(struct ptlrpc_request *request) { - ptl_process_id_t local_id; int rc; char *repbuf; @@ -282,9 +278,6 @@ int ptl_send_rpc(struct ptlrpc_request *request) RETURN(ENOMEM); } - local_id.nid = PTL_ID_ANY; - local_id.pid = PTL_ID_ANY; - down(&request->rq_client->cli_rpc_sem); rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni, diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index f87c484..97e683a 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -24,7 +24,7 @@ #include -extern int request_in_callback(ptl_event_t *ev, void *data); +extern int request_in_callback(ptl_event_t *ev); extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start); static int ptlrpc_check_event(struct ptlrpc_service *svc, @@ -101,7 +101,7 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, service->srv_ring_length = RPC_RING_LENGTH; rc = PtlEQAlloc(service->srv_self.peer_ni, 128, request_in_callback, - service, &(service->srv_eq_h)); + &(service->srv_eq_h)); if (rc != PTL_OK) { CERROR("PtlEQAlloc failed: %d\n", rc);