Whamcloud - gitweb
WARNING: This commit breaks everything. It will be back in shape within 12
authorpschwan <pschwan>
Thu, 13 Jun 2002 19:25:10 +0000 (19:25 +0000)
committerpschwan <pschwan>
Thu, 13 Jun 2002 19:25:10 +0000 (19:25 +0000)
hours, we hope, but update at your own (extreme) risk.

- Adds more unfinished LOV functionality
- Changes MDS EAs to contain LOV data
- Adds new brw callbacks
- Fixes to work with the Portals tip

30 files changed:
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_lov.h [new file with mode: 0644]
lustre/lib/mds_updates.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/rw.c
lustre/llite/super.c
lustre/lov/.cvsignore [new file with mode: 0644]
lustre/lov/Makefile.am
lustre/lov/lov_obd.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/Makefile.am
lustre/mds/handler.c
lustre/mds/mds_extN.c
lustre/mds/mds_reint.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdfilter/filter.c
lustre/obdfs/rw.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c

index a1e329e..41216ad 100644 (file)
@@ -61,24 +61,6 @@ struct lustre_msg {
         __u32   buflens[0];
 };
 
-struct niobuf_remote {
-        __u64 offset;
-        __u32 len;
-        __u32 xid;
-        __u32 flags;
-};
-
-struct niobuf_local {
-        __u64 offset;
-        __u32 len;
-        __u32 xid;
-        __u32 flags;
-        void *addr;
-        struct page *page;
-        void *target_private;
-        struct dentry *dentry;
-};
-
 #define N_LOCAL_TEMP_PAGE 0x00000001
 
 /*
@@ -114,8 +96,8 @@ typedef uint32_t        obd_rdev;
 typedef uint32_t        obd_flag;
 typedef uint32_t        obd_count;
 
-#define OBD_FL_INLINEDATA       (0x00000001UL)
-#define OBD_FL_OBDMDEXISTS      (0x00000002UL)
+#define OBD_FL_INLINEDATA       (0x00000001)
+#define OBD_FL_OBDMDEXISTS      (0x00000002)
 
 #define OBD_INLINESZ    60
 #define OBD_OBDMDSZ     60
@@ -166,6 +148,31 @@ struct obdo {
 #define OBD_MD_FLNOTOBD (~(OBD_MD_FLOBDMD | OBD_MD_FLOBDFLG | OBD_MD_FLBLOCKS |\
                            OBD_MD_LINKNAME))
 
+struct obd_ioobj {
+        obd_id    ioo_id;
+        obd_gr    ioo_gr;
+        __u32     ioo_type;
+        __u32     ioo_bufcnt;
+};
+
+struct niobuf_remote {
+        __u64 offset;
+        __u32 len;
+        __u32 xid;
+        __u32 flags;
+};
+
+struct niobuf_local {
+        __u64 offset;
+        __u32 len;
+        __u32 xid;
+        __u32 flags;
+        void *addr;
+        struct page *page;
+        void *target_private;
+        struct dentry *dentry;
+};
+
 /* request structure for OST's */
 
 #define OST_REQ_HAS_OA1  0x1
@@ -176,13 +183,6 @@ struct ost_body {
         struct  obdo oa;
 };
 
-struct obd_ioobj {
-        obd_id    ioo_id;
-        obd_gr    ioo_gr;
-        __u32     ioo_type;
-        __u32     ioo_bufcnt;
-};
-
 /*
  *   MDS REQ RECORDS
  */
@@ -213,8 +213,8 @@ struct ll_fid {
 struct mds_body {
         struct ll_fid  fid1;
         struct ll_fid  fid2;
-        __u64          objid;
         __u64          size;
+        __u64          extra;
         __u32          valid;
         __u32          mode;
         __u32          uid;
@@ -257,8 +257,7 @@ struct mds_rec_create {
         __u32           cr_gid;
         __u64           cr_time;
         __u32           cr_mode;
-        /* overloaded: id for create, tgtlen for symlink, rdev for mknod */
-        __u64           cr_id;
+        __u64           cr_rdev;
 };
 
 struct mds_rec_link {
index 3a9188f..bdb9ed1 100644 (file)
@@ -29,6 +29,11 @@ struct ll_file_data {
         __u32 fd_flags;
 };
 
+struct ll_inode_md {
+        struct mds_body *body;
+        struct obdo *obdo;
+};
+
 #define LL_IOC_GETFLAGS                 _IOR ('f', 151, long)
 #define LL_IOC_SETFLAGS                 _IOW ('f', 152, long)
 #define LL_IOC_CLRFLAGS                 _IOW ('f', 153, long)
@@ -38,7 +43,8 @@ struct ll_file_data {
 #define LL_INLINESZ      60
 struct ll_inode_info {
         int              lli_flags;
-        __u64            lli_objid; 
+        struct obdo     *lli_obdo;
+        char            *lli_symlink_name;
         char             lli_inline[LL_INLINESZ];
 };
 
index ffd84d1..69f71fb 100644 (file)
@@ -43,7 +43,7 @@ struct mds_update_record {
         int ur_tgtlen;
         char *ur_tgt;
         struct iattr ur_iattr;
-        __u64 ur_id;
+        __u64 ur_rdev;
         __u32 ur_mode;
         __u32 ur_uid;
         __u32 ur_gid;
@@ -132,8 +132,8 @@ int mdc_readpage(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
                  int type, __u64 offset, char *addr, struct ptlrpc_request **);
 int mdc_create(struct ptlrpc_client *, struct ptlrpc_connection *,
                struct inode *dir, const char *name, int namelen, 
-               const char *tgt, int tgtlen, 
-               int mode, __u64 id, __u32 uid, __u32 gid, __u64 time, 
+               const char *tgt, int tgtlen, int mode, __u32 uid, __u32 gid,
+               __u64 time, __u64 rdev, struct obdo *obdo,
                struct ptlrpc_request **);
 int mdc_unlink(struct ptlrpc_client *, struct ptlrpc_connection *,
                struct inode *dir, struct inode *child, const char *name,
@@ -156,8 +156,9 @@ struct mds_fs_operations {
         int     (* fs_commit)(struct inode *inode, void *handle);
         int     (* fs_setattr)(struct dentry *dentry, void *handle,
                                struct iattr *iattr);
-        int     (* fs_set_objid)(struct inode *inode, void *handle, obd_id id);
-        int     (* fs_get_objid)(struct inode *inode, obd_id *id);
+        int     (* fs_set_obdo)(struct inode *inode, void *handle,
+                                struct obdo *obdo);
+        int     (* fs_get_obdo)(struct inode *inode, struct obdo *obdo);
         ssize_t (* fs_readpage)(struct file *file, char *buf, size_t count,
                                 loff_t *offset);
         void    (* fs_delete_inode)(struct inode *inode);
@@ -196,16 +197,16 @@ static inline int mds_fs_setattr(struct mds_obd *mds, struct dentry *dentry,
         return mds->mds_fsops->fs_setattr(dentry, handle, iattr);
 }
 
-static inline int mds_fs_set_objid(struct mds_obd *mds, struct inode *inode,
-                                   void *handle, __u64 id)
+static inline int mds_fs_set_obdo(struct mds_obd *mds, struct inode *inode,
+                                  void *handle, struct obdo *obdo)
 {
-        return mds->mds_fsops->fs_set_objid(inode, handle, id);
+        return mds->mds_fsops->fs_set_obdo(inode, handle, obdo);
 }
 
-static inline int mds_fs_get_objid(struct mds_obd *mds, struct inode *inode,
-                                    __u64 *id)
+static inline int mds_fs_get_obdo(struct mds_obd *mds, struct inode *inode,
+                                  struct obdo *obdo)
 {
-        return mds->mds_fsops->fs_get_objid(inode, id);
+        return mds->mds_fsops->fs_get_obdo(inode, obdo);
 }
 
 static inline ssize_t mds_fs_readpage(struct mds_obd *mds, struct file *file,
index ae665db..ae09030 100644 (file)
@@ -197,13 +197,14 @@ struct ptlrpc_bulk_desc {
         struct ptlrpc_connection *b_connection;
         struct ptlrpc_client *b_client;
         __u32 b_portal;
-        int (*b_cb)(struct ptlrpc_bulk_desc *);
         struct obd_conn b_conn;
+        void (*b_cb)(struct ptlrpc_bulk_desc *, void *);
+        void *b_cb_data;
 
         wait_queue_head_t b_waitq;
         struct list_head b_page_list;
         __u32 b_page_count;
-        __u32 b_finished_count;
+        atomic_t b_finished_count;
         void *b_desc_private;
 };
 
@@ -245,6 +246,8 @@ struct ptlrpc_service {
                            struct ptlrpc_request *req);
 };
 
+typedef void (*bulk_callback_t)(struct ptlrpc_bulk_desc *, void *);
+
 typedef int (*svc_handler_t)(struct obd_device *obddev,
                              struct ptlrpc_service *svc,
                              struct ptlrpc_request *req);
@@ -258,6 +261,7 @@ void ptlrpc_init_connection(void);
 void ptlrpc_cleanup_connection(void);
 
 /* rpc/niobuf.c */
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk);
 int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *);
 int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
index 2c63782..b7b2014 100644 (file)
@@ -159,13 +159,22 @@ struct osc_obd {
         struct ptlrpc_connection *osc_conn;
 };
 
+typedef __u8 uuid_t[37];
+
+#define MAX_MULTI       16
 struct lov_obd {
+        __u32 lov_default_count;
+        __u32 lov_default_pattern;
+        __u32 lov_default_size;
+        uuid_t lov_service_uuids[MAX_MULTI];
+
+#if 0
         int lov_count;
         struct obd_conn *lov_targets;
+#endif
 };
 
 /* corresponds to one of the obd's */
-#define MAX_MULTI       16
 struct obd_device {
         struct obd_type *obd_type;
         char *obd_name;
@@ -231,7 +240,8 @@ struct obd_ops {
                        obd_size *count, obd_off offset);
         int (*o_brw)(int rw, struct obd_conn *conn, obd_count num_oa,
                      struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-                     obd_size *count, obd_off *offset, obd_flag *flags);
+                     obd_size *count, obd_off *offset, obd_flag *flags,
+                     void *);
         int (*o_punch)(struct obd_conn *conn, struct obdo *tgt, obd_size count,
                        obd_off offset);
         int (*o_sync)(struct obd_conn *conn, struct obdo *tgt, obd_size count,
@@ -251,6 +261,7 @@ struct obd_ops {
                           int objcount, struct obd_ioobj *obj,
                           int niocount, struct niobuf_local *local,
                           void *desc_private);
+
         int (*o_enqueue)(struct obd_conn *conn, struct ldlm_namespace *ns,
                          struct ldlm_handle *parent_lock, __u64 *res_id,
                          __u32 type, struct ldlm_extent *, __u32 mode,
index 40d7343..7054978 100644 (file)
@@ -295,14 +295,14 @@ static inline int obd_punch(struct obd_conn *conn, struct obdo *tgt,
 static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                           struct obdo **oa, obd_count *oa_bufs,
                           struct page **buf, obd_size *count, obd_off *offset,
-                          obd_flag *flags)
+                          obd_flag *flags, void *callback)
 {
         int rc;
         OBD_CHECK_SETUP(conn);
         OBD_CHECK_OP(conn,brw);
 
         rc = OBP(conn->oc_dev, brw)(rw, conn, num_oa, oa, oa_bufs, buf,
-                                    count, offset, flags);
+                                    count, offset, flags, callback);
         RETURN(rc);
 }
 
@@ -403,6 +403,8 @@ static __inline__ struct obdo *obdo_alloc(void)
         struct obdo *oa = NULL;
 
         oa = kmem_cache_alloc(obdo_cachep, SLAB_KERNEL);
+        if (oa == NULL)
+                LBUG();
         memset(oa, 0, sizeof (*oa));
 
         return oa;
diff --git a/lustre/include/linux/obd_lov.h b/lustre/include/linux/obd_lov.h
new file mode 100644 (file)
index 0000000..9cfbd85
--- /dev/null
@@ -0,0 +1,26 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef _OBD_LOV_H__
+#define _OBD_LOV_H__
+
+#ifdef __KERNEL__
+
+#define OBD_LOV_DEVICENAME "lov"
+
+struct lov_object_id { /* per-child structure */
+        __u64 l_object_id;
+        __u32 l_device_id;
+};
+
+struct lov_md {
+        __u64 lmd_object_id;     /* lov object id */
+        __u64 lmd_stripe_count;
+        __u32 lmd_stripe_size;
+        __u32 lmd_stripe_pattern;  /* per-lov object stripe pattern */
+        struct lov_object_id lmd_objects[0];
+};
+
+#endif
+#endif
index 2f49c34..4441661 100644 (file)
@@ -50,7 +50,7 @@ static void mds_pack_body(struct mds_body *b)
 
         mds_pack_fid(&b->fid1);
         mds_pack_fid(&b->fid2);
-        b->objid = HTON__u64(b->objid);
+        b->extra = HTON__u64(b->extra);
         b->size = HTON__u64(b->size);
         b->valid = HTON__u32(b->valid);
         b->mode = HTON__u32(b->mode);
@@ -83,13 +83,13 @@ void mds_pack_rep_body(struct ptlrpc_request *req)
 
 /* packing of MDS records */
 void mds_create_pack(struct mds_rec_create *rec, struct inode *inode,
-                     __u32 mode, __u64 id, __u32 uid, __u32 gid, __u64 time)
+                     __u32 mode, __u64 rdev, __u32 uid, __u32 gid, __u64 time)
 {
         /* XXX do something about time, uid, gid */
         rec->cr_opcode = HTON__u32(REINT_CREATE);
         ll_inode2fid(&rec->cr_fid, inode);
         rec->cr_mode = HTON__u32(mode);
-        rec->cr_id = HTON__u64(id);
+        rec->cr_rdev = HTON__u64(rdev);
         rec->cr_uid = HTON__u32(uid);
         rec->cr_gid = HTON__u32(gid);
         rec->cr_time = HTON__u64(time);
@@ -151,7 +151,7 @@ static void mds_unpack_body(struct mds_body *b)
 
         mds_unpack_fid(&b->fid1);
         mds_unpack_fid(&b->fid2);
-        b->objid = NTOH__u64(b->objid);
+        b->extra = NTOH__u64(b->extra);
         b->size = NTOH__u64(b->size);
         b->valid = NTOH__u32(b->valid);
         b->mode = NTOH__u32(b->mode);
@@ -212,13 +212,13 @@ static int mds_create_unpack(struct ptlrpc_request *req,
         struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, 0);
         ENTRY;
 
-        if (req->rq_reqmsg->bufcount != 3 ||
+        if (req->rq_reqmsg->bufcount < 2 ||
             req->rq_reqmsg->buflens[0] != sizeof(*rec))
                 RETURN(-EFAULT);
 
         r->ur_fid1 = &rec->cr_fid;
         r->ur_mode = NTOH__u32(rec->cr_mode);
-        r->ur_id = NTOH__u64(rec->cr_id);
+        r->ur_rdev = NTOH__u64(rec->cr_rdev);
         r->ur_uid = NTOH__u32(rec->cr_uid);
         r->ur_gid = NTOH__u32(rec->cr_gid);
         r->ur_time = NTOH__u64(rec->cr_time);
@@ -226,8 +226,10 @@ static int mds_create_unpack(struct ptlrpc_request *req,
         r->ur_name = lustre_msg_buf(req->rq_reqmsg, 1);
         r->ur_namelen = req->rq_reqmsg->buflens[1];
 
-        r->ur_tgt = lustre_msg_buf(req->rq_reqmsg, 2);
-        r->ur_tgtlen = req->rq_reqmsg->buflens[2];
+        if (S_ISLNK(r->ur_mode)) {
+                r->ur_tgt = lustre_msg_buf(req->rq_reqmsg, 2);
+                r->ur_tgtlen = req->rq_reqmsg->buflens[2];
+        }
         RETURN(0);
 }
 
index 834c35b..f43bfa0 100644 (file)
@@ -30,8 +30,6 @@
 
 int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
 extern int ll_setattr(struct dentry *de, struct iattr *attr);
-extern inline struct obdo * ll_oa_from_inode(struct inode *inode,
-                                             unsigned long valid);
 
 static int ll_file_open(struct inode *inode, struct file *file)
 {
@@ -61,14 +59,15 @@ static int ll_file_open(struct inode *inode, struct file *file)
                 CERROR("mdc_open didn't assign fd_mdshandle\n");
                 /* XXX handle this how, abort or is it non-fatal? */
         }
+        if (!fd->fd_mdshandle)
+                CERROR("mdc_open didn't assign fd_mdshandle\n");
 
-        oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
+        oa = ll_i2info(inode)->lli_obdo;
         if (oa == NULL) {
                 LBUG();
                 GOTO(out_mdc, rc = -ENOMEM);
         }
         rc = obd_open(ll_i2obdconn(inode), oa);
-        obdo_free(oa);
         if (rc) {
                 GOTO(out_mdc, rc = -abs(rc));
         }
@@ -106,16 +105,14 @@ static int ll_file_release(struct inode *inode, struct file *file)
                 GOTO(out, rc = -EINVAL);
         }
 
-        oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
+        oa = ll_i2info(inode)->lli_obdo;
         if (oa == NULL) {
                 LBUG();
                 GOTO(out_fd, rc = -ENOENT);
         }
         rc = obd_close(ll_i2obdconn(inode), oa);
-        obdo_free(oa);
-        if (rc) {
+        if (rc)
                 GOTO(out_fd, abs(rc));
-        }
 
         if (file->f_mode & FMODE_WRITE) {
                 struct iattr attr;
index 91f13b3..70990cb 100644 (file)
@@ -95,10 +95,10 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry)
         struct ptlrpc_request *request = NULL;
         struct inode * inode = NULL;
         struct ll_sb_info *sbi = ll_i2sbi(dir);
-        int err;
-        int type;
+        struct ll_inode_md md;
+        int err, type;
         ino_t ino;
-        
+
         ENTRY;
         if (dentry->d_name.len > EXT2_NAME_LEN)
                 RETURN(ERR_PTR(-ENAMETOOLONG));
@@ -115,8 +115,18 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry)
                 RETURN(ERR_PTR(-abs(err)));
         }
 
-        inode = iget4(dir->i_sb, ino, ll_find_inode,
-                      lustre_msg_buf(request->rq_repmsg, 0));
+        if (S_ISREG(type)) {
+                if (request->rq_repmsg->bufcount < 2 ||
+                    request->rq_repmsg->buflens[1] != sizeof(struct obdo))
+                        LBUG();
+
+                md.obdo = lustre_msg_buf(request->rq_repmsg, 1);
+        } else
+                md.obdo = NULL;
+
+        md.body = lustre_msg_buf(request->rq_repmsg, 0);
+
+        inode = iget4(dir->i_sb, ino, ll_find_inode, &md);
 
         ptlrpc_free_req(request);
         if (!inode) 
@@ -130,7 +140,7 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry)
 
 static struct inode *ll_create_node(struct inode *dir, const char *name, 
                                     int namelen, const char *tgt, int tgtlen, 
-                                    int mode, __u64 id)
+                                    int mode, __u64 extra, struct obdo *obdo)
 {
         struct inode *inode;
         struct ptlrpc_request *request = NULL;
@@ -138,12 +148,13 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
         int err;
         time_t time = CURRENT_TIME;
         struct ll_sb_info *sbi = ll_i2sbi(dir);
+        struct ll_inode_md md;
 
         ENTRY;
 
         err = mdc_create(&sbi->ll_mds_client, sbi->ll_mds_conn, dir, name,
-                         namelen, tgt, tgtlen, mode, id,  current->fsuid,
-                         current->fsgid, time, &request);
+                         namelen, tgt, tgtlen, mode, current->fsuid,
+                         current->fsgid, time, extra, obdo, &request);
         if (err) { 
                 inode = ERR_PTR(err);
                 GOTO(out, err);
@@ -151,16 +162,16 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
         body = lustre_msg_buf(request->rq_repmsg, 0);
         body->valid = (__u32)OBD_MD_FLNOTOBD;
 
-        body->objid = id; 
         body->nlink = 1;
         body->atime = body->ctime = body->mtime = time;
         body->uid = current->fsuid;
         body->gid = current->fsgid;
         body->mode = mode;
-        CDEBUG(D_INODE, "-- new_inode: objid %lld, ino %d, mode %o\n",
-               (unsigned long long)body->objid, body->ino, body->mode); 
 
-        inode = iget4(dir->i_sb, body->ino, ll_find_inode, body);
+        md.body = body;
+        md.obdo = obdo;
+
+        inode = iget4(dir->i_sb, body->ino, ll_find_inode, &md);
         if (IS_ERR(inode)) {
                 CERROR("new_inode -fatal:  %ld\n", PTR_ERR(inode));
                 inode = ERR_PTR(-EIO);
@@ -249,7 +260,7 @@ static int ll_create (struct inode * dir, struct dentry * dentry, int mode)
 {
         int err, rc;
         struct obdo oa;
-        struct inode * inode;
+        struct inode *inode;
 
         memset(&oa, 0, sizeof(oa));
         oa.o_valid = OBD_MD_FLMODE;
@@ -263,8 +274,8 @@ static int ll_create (struct inode * dir, struct dentry * dentry, int mode)
         mode = mode | S_IFREG;
         CDEBUG(D_DENTRY, "name %s mode %o o_id %lld\n",
                dentry->d_name.name, mode, (unsigned long long)oa.o_id);
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               NULL, 0, mode, oa.o_id);
+        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len, 
+                               NULL, 0, mode, 0, &oa);
 
        if (IS_ERR(inode)) {
                rc = PTR_ERR(inode);
@@ -290,11 +301,12 @@ out_destroy:
 } /* ll_create */
 
 
-static int ll_mknod (struct inode * dir, struct dentry *dentry, int mode, int rdev)
+static int ll_mknod (struct inode * dir, struct dentry *dentry, int mode,
+                     int rdev)
 {
         struct inode * inode = ll_create_node(dir, dentry->d_name.name, 
                                               dentry->d_name.len, NULL, 0,
-                                              mode, 0);
+                                              mode, rdev, NULL);
         int err = PTR_ERR(inode);
         if (!IS_ERR(inode)) {
                 init_special_inode(inode, mode, rdev);
@@ -316,7 +328,7 @@ static int ll_symlink (struct inode * dir, struct dentry * dentry,
 
         inode = ll_create_node(dir, dentry->d_name.name, 
                                dentry->d_name.len, symname, l,
-                               S_IFLNK | S_IRWXUGO, 0);
+                               S_IFLNK | S_IRWXUGO, 0, NULL);
         err = PTR_ERR(inode);
         if (IS_ERR(inode))
                 return err;
@@ -376,7 +388,7 @@ static int ll_mkdir(struct inode * dir, struct dentry * dentry, int mode)
 
         inode = ll_create_node (dir, dentry->d_name.name, 
                                 dentry->d_name.len, NULL, 0, 
-                                S_IFDIR | mode, 0);
+                                S_IFDIR | mode, 0, NULL);
         err = PTR_ERR(inode);
         if (IS_ERR(inode))
                 goto out_dir;
index 9f09ba3..2fe1f3e 100644 (file)
 #include <linux/lustre_lite.h>
 #include <linux/lustre_lib.h>
 
-inline struct obdo * ll_oa_from_inode(struct inode *inode, unsigned long valid)
-{
-        struct ll_inode_info *oinfo = ll_i2info(inode);
-        struct obdo *oa = obdo_alloc();
-        if ( !oa ) {
-                CERROR("no memory to allocate obdo!\n"); 
-                return NULL;
-        }
-        oa->o_valid = valid;
-
-        if ( valid & OBD_MD_FLID )
-                oa->o_id = oinfo->lli_objid;
-        if ( valid & OBD_MD_FLATIME )
-                oa->o_atime = inode->i_atime;
-        if ( valid & OBD_MD_FLMTIME )
-                oa->o_mtime = inode->i_mtime;
-        if ( valid & OBD_MD_FLCTIME )
-                oa->o_ctime = inode->i_ctime;
-        if ( valid & OBD_MD_FLSIZE )
-                oa->o_size = inode->i_size;
-        if ( valid & OBD_MD_FLBLOCKS )   /* allocation of space */
-                oa->o_blocks = inode->i_blocks;
-        if ( valid & OBD_MD_FLBLKSZ )
-                oa->o_blksize = inode->i_blksize;
-        if ( valid & OBD_MD_FLMODE )
-                oa->o_mode = inode->i_mode;
-        if ( valid & OBD_MD_FLUID )
-                oa->o_uid = inode->i_uid;
-        if ( valid & OBD_MD_FLGID )
-                oa->o_gid = inode->i_gid;
-        if ( valid & OBD_MD_FLFLAGS )
-                oa->o_flags = inode->i_flags;
-        if ( valid & OBD_MD_FLNLINK )
-                oa->o_nlink = inode->i_nlink;
-        if ( valid & OBD_MD_FLGENER ) 
-                oa->o_generation = inode->i_generation;
-
-        CDEBUG(D_INFO, "src inode %ld, dst obdo %ld valid 0x%08lx\n",
-               inode->i_ino, (long)oa->o_id, valid);
-#if 0
-        /* this will transfer metadata for the logical object to 
-           the oa: that metadata could contain the constituent objects
-        */
-        if (ll_has_inline(inode)) {
-                CDEBUG(D_INODE, "copying inline data from inode to obdo\n");
-                memcpy(oa->o_inline, oinfo->lli_inline, OBD_INLINESZ);
-                oa->o_obdflags |= OBD_FL_INLINEDATA;
-                oa->o_valid |= OBD_MD_FLINLINE;
-        }
-#endif
-        return oa;
-} /* ll_oa_from_inode */
-
 /* SYNCHRONOUS I/O to object storage for an inode */
 static int ll_brw(int rw, struct inode *inode, struct page *page, int create)
 {
@@ -98,14 +45,9 @@ static int ll_brw(int rw, struct inode *inode, struct page *page, int create)
         int              err;
         ENTRY;
 
-        oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
-        if (!oa)
-                RETURN(-ENOMEM);
-
+        oa = ll_i2info(inode)->lli_obdo;
         err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
-                      &page, &count, &offset, &flags);
-
-        obdo_free(oa);
+                      &page, &count, &offset, &flags, NULL);
         RETURN(err);
 } /* ll_brw */
 
@@ -217,9 +159,7 @@ static int ll_commit_write(struct file *file, struct page *page,
         struct iattr     iattr;
 
         ENTRY;
-        oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
-        if (! oa )
-                RETURN(-ENOMEM);
+        oa = ll_i2info(inode)->lli_obdo;
 
         SetPageUptodate(page);
 
@@ -230,7 +170,7 @@ static int ll_commit_write(struct file *file, struct page *page,
                from, to, (unsigned long long)count);
 
         err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), num_obdo, &oa,
-                      &bufs_per_obdo, &page, &count, &offset, &flags);
+                      &bufs_per_obdo, &page, &count, &offset, &flags, NULL);
         kunmap(page);
 
         if ((iattr.ia_size = offset + to) > inode->i_size) {
@@ -246,7 +186,6 @@ static int ll_commit_write(struct file *file, struct page *page,
 #endif
         }
 
-        obdo_free(oa);
         RETURN(err);
 } /* ll_commit_write */
 
@@ -256,16 +195,11 @@ void ll_truncate(struct inode *inode)
         int err;
         ENTRY;
 
-        oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
-        if ( !oa ) {
-                CERROR("no memory to allocate obdo!\n");
-                return; 
-        } 
+        oa = ll_i2info(inode)->lli_obdo;
         
         CDEBUG(D_INFO, "calling punch for %ld (%Lu bytes at 0)\n",
                (long)oa->o_id, (unsigned long long)oa->o_size);
         err = obd_punch(ll_i2obdconn(inode), oa, oa->o_size, 0);
-        obdo_free(oa);
 
         if (err) {
                 CERROR("obd_truncate fails (%d)\n", err);
@@ -308,20 +242,21 @@ int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
                 flags[i] = OBD_BRW_CREATE;
         }
 
-        oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
+        oa = ll_i2info(inode)->lli_obdo;
         if (!oa)
                 GOTO(out, rc = -ENOMEM);
-
         rc = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
-                     iobuf->maplist, count, offset, flags);
-        if (rc == 0)
+                      iobuf->maplist, count, offset, flags, NULL);
+        if (rc == 0) 
                 rc = bufs_per_obdo * PAGE_SIZE;
 
  out:
-        obdo_free(oa);
-        OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
-        OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
-        OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
+        if (flags) 
+                OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo); 
+        if (count) 
+                OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo); 
+        if (offset) 
+                OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo); 
         RETURN(rc);
 }
 
index 1634e4f..9a8c805 100644 (file)
@@ -88,6 +88,7 @@ static struct super_block * ll_read_super(struct super_block *sb,
         __u64 last_committed, last_rcvd;
         __u32 last_xid;
         struct ptlrpc_request *request = NULL;
+        struct ll_inode_md md;
 
         ENTRY;
         MOD_INC_USE_COUNT;
@@ -185,8 +186,10 @@ static struct super_block * ll_read_super(struct super_block *sb,
                 GOTO(out_mdc, sb = NULL);
         }
 
-        root = iget4(sb, sbi->ll_rootino, NULL,
-                     lustre_msg_buf(request->rq_repmsg, 0));
+        md.body = lustre_msg_buf(request->rq_repmsg, 0);
+        md.obdo = NULL;
+        root = iget4(sb, sbi->ll_rootino, NULL, &md);
+                     
         if (root) {
                 sb->s_root = d_alloc_root(root);
         } else {
@@ -237,14 +240,28 @@ static void ll_put_super(struct super_block *sb)
         EXIT;
 } /* ll_put_super */
 
-
-extern inline struct obdo * ll_oa_from_inode(struct inode *inode, int valid);
+static void ll_clear_inode(struct inode *inode)
+{
+        if (atomic_read(&inode->i_count) == 0) {
+                struct obdo *oa = ll_i2info(inode)->lli_obdo;
+                if (oa) {
+                        obdo_free(oa);
+                        ll_i2info(inode)->lli_obdo = NULL;
+                }
+                if (ll_i2info(inode)->lli_symlink_name) {
+                        OBD_FREE(ll_i2info(inode)->lli_symlink_name,
+                                 strlen(ll_i2info(inode)->lli_symlink_name)+ 1);
+                        ll_i2info(inode)->lli_symlink_name = NULL;
+                }
+        }
+}
 
 static void ll_delete_inode(struct inode *inode)
 {
-        if (S_ISREG(inode->i_mode)) {
-                int err;
-                struct obdo *oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
+        if (S_ISREG(inode->i_mode)) { 
+                int err; 
+                struct obdo *oa; 
+                oa = ll_i2info(inode)->lli_obdo;
 
                 if (!oa) {
                         CERROR("no memory\n");
@@ -349,8 +366,9 @@ static int ll_statfs(struct super_block *sb, struct statfs *buf)
         RETURN(err);
 }
 
-static void inline ll_to_inode(struct inode *dst, struct mds_body *body)
+static void inline ll_to_inode(struct inode *dst, struct ll_inode_md *md)
 {
+        struct mds_body *body = md->body;
         struct ll_inode_info *ii = ll_i2info(dst);
 
         /* core attributes first */
@@ -378,8 +396,10 @@ static void inline ll_to_inode(struct inode *dst, struct mds_body *body)
                 dst->i_generation = body->generation;
 
         /* this will become more elaborate for striping etc */ 
-        if (body->valid & OBD_MD_FLOBJID) 
-                ii->lli_objid = body->objid;
+        if (md->obdo != NULL) {
+                ii->lli_obdo = obdo_alloc();
+                memcpy(ii->lli_obdo, md->obdo, sizeof(*md->obdo));
+        }
 #if 0
 
         if (obdo_has_inline(oa)) {
@@ -400,10 +420,10 @@ static void inline ll_to_inode(struct inode *dst, struct mds_body *body)
 
 static inline void ll_read_inode2(struct inode *inode, void *opaque)
 {
-        struct mds_body *body = opaque; 
+        struct ll_inode_md *md = opaque;
         
         ENTRY;
-        ll_to_inode(inode, body); 
+        ll_to_inode(inode, md);
 
         /* OIDEBUG(inode); */
 
@@ -433,6 +453,7 @@ static inline void ll_read_inode2(struct inode *inode, void *opaque)
 struct super_operations ll_super_operations =
 {
         read_inode2: ll_read_inode2,
+        clear_inode: ll_clear_inode,
         delete_inode: ll_delete_inode,
         put_super: ll_put_super,
         statfs: ll_statfs
diff --git a/lustre/lov/.cvsignore b/lustre/lov/.cvsignore
new file mode 100644 (file)
index 0000000..282522d
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+Makefile.in
index 8351f5b..90d38d4 100644 (file)
@@ -5,10 +5,10 @@
 
 DEFS :=
 
-#MODULE = lov
-#modulefs_DATA = lov.o
-#EXTRA_PROGRAMS = lov
+MODULE = lov
+modulefs_DATA = lov.o
+EXTRA_PROGRAMS = lov
 
-#lov_SOURCES = lov_obd.c
+lov_SOURCES = lov_obd.c
 
 include $(top_srcdir)/Rules
index e74d0fe..bb5df34 100644 (file)
@@ -15,6 +15,7 @@
 
 #include <linux/module.h>
 #include <linux/obd_class.h>
+#include <linux/obd_lov.h>
 
 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
 
@@ -115,65 +116,70 @@ static int lov_close(struct obd_conn *conn, struct obdo *oa)
 
 static int lov_create(struct obd_conn *conn, struct obdo *oa)
 {
-        int rc, i;
+        int rc, retval, i, offset;
+        struct obdo tmp;
+        struct lov_md md;
         ENTRY;
 
         if (!gen_client(conn))
                 RETURN(-EINVAL);
 
-        for (i = 0; i < conn->oc_dev->obd_multi_count; i++)
-                rc = obd_create(&conn->oc_dev->obd_multi_conn[i], oa);
+        md.lmd_object_id = oa->o_id;
+        md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
+
+        memset(oa->o_inline, 0, sizeof(oa->o_inline));
+        offset = sizeof(md);
+        for (i = 0; i < md.lmd_stripe_count; i++) {
+                struct lov_object_id lov_id;
+                rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
+                if (i == 0) {
+                        memcpy(oa, &tmp, sizeof(tmp));
+                        retval = rc;
+                } else if (retval != rc)
+                        CERROR("return codes didn't match (%d, %d)\n",
+                               retval, rc);
+                lov_id = (struct lov_object_id *)(oa->o_inline + offset);
+                lov_id->l_device_id = i;
+                lov_id->l_object_id = tmp.o_id;
+                offset += sizeof(*lov_id);
+        }
+        memcpy(oa->o_inline, &md, sizeof(md));
 
         return rc;
 }
 
-static int filter_destroy(struct obd_conn *conn, struct obdo *oa)
+static int lov_destroy(struct obd_conn *conn, struct obdo *oa)
 {
-#if 0
-        struct obd_device * obddev;
-        struct obd_client * cli;
-        struct inode * inode;
-        struct file *dir;
-        struct file *object;
-        int rc;
-        struct obd_run_ctxt saved;
+        int rc, retval, i, offset;
+        struct obdo tmp;
+        struct lov_md *md;
+        struct lov_object_id lov_id;
+        ENTRY;
 
-        if (!(cli = gen_client(conn))) {
-                CERROR("invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
-        }
+        if (!gen_client(conn))
+                RETURN(-EINVAL);
 
-        obddev = conn->oc_dev;
-        object = filter_obj_open(obddev, oa->o_id, oa->o_mode);
-        if (!object || IS_ERR(object)) {
-                EXIT;
-                return -ENOENT;
-        }
+        md = (struct lov_md *)oa->o_inline;
 
-        inode = object->f_dentry->d_inode;
-        inode->i_nlink = 1;
-        inode->i_mode = 010000;
+        memcpy(&tmp, oa, sizeof(tmp));
 
-        push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
-        dir = filter_parent(oa->o_id, oa->o_mode);
-        if (IS_ERR(dir)) {
-                rc = PTR_ERR(dir);
-                EXIT;
-                goto out;
+        offset = sizeof(md);
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                struct lov_object_id *lov_id;
+                lov_id = (struct lov_object_id *)(oa->o_inline + offset);
+
+                tmp.o_id = lov_id->l_object_id;
+
+                rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
+                if (i == 0)
+                        retval = rc;
+                else if (retval != rc)
+                        CERROR("return codes didn't match (%d, %d)\n",
+                               retval, rc);
+                offset += sizeof(*lov_id);
         }
-        dget(dir->f_dentry);
-        dget(object->f_dentry);
-        rc = vfs_unlink(dir->f_dentry->d_inode, object->f_dentry);
-
-        filp_close(dir, 0);
-        filp_close(object, 0);
-out:
-        pop_ctxt(&saved);
-        EXIT;
+
         return rc;
-#endif
-        return 0;
 }
 
 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
@@ -200,18 +206,74 @@ static int lov_punch(struct obd_conn *conn, struct obdo *oa,
         RETURN(rc);
 }
 
+struct lov_callback_data {
+        atomic_t count;
+        wait_queue_head waitq;
+};
+
+static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
+{
+        struct lov_callback_data *cb_data = data;
+
+        if (atomic_dec_and_test(&cb_data->count))
+                wake_up(&cb_data->waitq);
+}
+
+static int lov_read_check_status(struct lov_callback_data *cb_data)
+{
+        ENTRY;
+        if (sigismember(&(current->pending.signal), SIGKILL) ||
+            sigismember(&(current->pending.signal), SIGTERM) ||
+            sigismember(&(current->pending.signal), SIGINT)) {
+                cb_data->flags |= PTL_RPC_FL_INTR;
+                RETURN(1);
+        }
+        if (atomic_read(&cb_data->count) == 0)
+                RETURN(1);
+        RETURN(0);
+}
+
 /* buffer must lie in user memory here */
-static int lov_read(struct obd_conn *conn, struct obdo *oa, char *buf,
-                    obd_size *count, obd_off offset)
+static int lov_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
+                   struct obdo **oa,
+                   obd_count *oa_bufs, struct page **buf,
+                   obd_size *count, obd_off *offset, obd_flag *flags,
+                   bulk_callback_t callback, void *data)
 {
-        int rc, i;
+        int rc, i, page_array_offset = 0;
         obd_off off = offset;
         obd_size retval = 0;
+        struct lov_callback_data *cb_data;
         ENTRY;
 
+        if (num_oa != 1)
+                LBUG();
+
         if (!gen_client(conn))
                 RETURN(-EINVAL);
 
+        OBD_ALLOC(cb_data, sizeof(*cb_data));
+        if (cb_data == NULL) {
+                LBUG();
+                RETURN(-ENOMEM);
+        }
+        INIT_WAITQUEUE_HEAD(&cb_data->waitq);
+        atomic_set(&cb_data->count, 0);
+
+        for (i = 0; i < oa_bufs[0]; i++) {
+                struct page *current_page = buf[i];
+
+                struct lov_md *md = (struct lov_md *)oa[i]->inline;
+                int bufcount = oa_bufs[i];
+                // md->lmd_stripe_count
+
+                for (k = page_array_offset; k < bufcount + page_array_offset;
+                     k++) {
+                        
+                }
+                page_array_offset += bufcount;
+
+
         while (off < offset + count) {
                 int stripe, conn;
                 obd_size size, tmp;
@@ -221,12 +283,13 @@ static int lov_read(struct obd_conn *conn, struct obdo *oa, char *buf,
                 if (size > *count)
                         size = *count;
 
-
                 conn = stripe % conn->oc_dev->obd_multi_count;
 
                 tmp = size;
-                rc = obd_read(&conn->oc_dev->obd_multi_conn[conn], oa, buf,
-                              &size, off);
+                atomic_inc(&cb_data->count);
+                rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
+                             num_oa, oa, buf,
+                              &size, off, lov_read_callback, cb_data);
                 if (rc == 0)
                         retval += size;
                 else {
@@ -239,17 +302,28 @@ static int lov_read(struct obd_conn *conn, struct obdo *oa, char *buf,
                 buf += size;
         }
 
+        wait_event_interruptible(&cb_data->waitq,
+                                 lov_read_check_status(cb_data));
+        if (cb_data->flags & PTL_RPC_FL_INTR)
+                rc = -EINTR;
+
+        /* FIXME: The error handling here sucks */
         *count = retval;
+        OBD_FREE(cb_data, sizeof(*cb_data));
         RETURN(rc);
 }
 
+static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
+{
+        
+}
 
 /* buffer must lie in user memory here */
 static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf,
                          obd_size *count, obd_off offset)
 {
         int err;
-        struct file * file;
+        struct file *file;
         unsigned long retval;
 
         ENTRY;
@@ -312,22 +386,25 @@ static int lov_cancel(struct obd_conn *conn, __u32 mode,
         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
         RETURN(rc);
 }
+#endif
 
 struct obd_ops lov_obd_ops = {
         o_setup:       gen_multi_setup,
         o_cleanup:     gen_multi_cleanup,
         o_create:      lov_create,
-        o_destroy:     lov_destroy,
+        o_connect:     lov_connect,
+        o_disconnect:  lov_disconnect,
         o_getattr:     lov_getattr,
         o_setattr:     lov_setattr,
         o_open:        lov_open,
         o_close:       lov_close,
-        o_connect:     lov_connect,
-        o_disconnect:  lov_disconnect,
+#if 0
+        o_destroy:     lov_destroy,
         o_brw:         lov_pgcache_brw,
         o_punch:       lov_punch,
         o_enqueue:     lov_enqueue,
         o_cancel:      lov_cancel
+#endif
 };
 
 
index 2f59c2e..049c409 100644 (file)
@@ -74,27 +74,40 @@ int mdc_setattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
 
 int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                struct inode *dir, const char *name, int namelen,
-               const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid,
-               __u32 gid, __u64 time, struct ptlrpc_request **request)
+               const char *tgt, int tgtlen, int mode, __u32 uid,
+               __u32 gid, __u64 time, __u64 rdev, struct obdo *obdo,
+               struct ptlrpc_request **request)
 {
         struct mds_rec_create *rec;
         struct ptlrpc_request *req;
-        int rc, size[3] = {sizeof(*rec), namelen + 1, tgtlen + 1};
-        char *tmp;
-        int level;
+        int rc, size[3] = {sizeof(*rec), namelen + 1, 0};
+        char *tmp, *bufs[3] = {NULL, NULL, NULL};
+        int level, bufcount = 2;
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL);
+        if (S_ISREG(mode)) {
+                size[2] = sizeof(*obdo);
+                bufs[2] = (char *)obdo;
+                bufcount = 3;
+        } else if (S_ISLNK(mode)) {
+                size[2] = tgtlen + 1;
+                bufcount = 3;
+        }
+
+        req = ptlrpc_prep_req(cl, conn, MDS_REINT, bufcount, size, bufs);
         if (!req)
                 RETURN(-ENOMEM);
 
         rec = lustre_msg_buf(req->rq_reqmsg, 0);
-        mds_create_pack(rec, dir, mode, id, uid, gid, time);
+        mds_create_pack(rec, dir, mode, rdev, uid, gid, time);
 
         tmp = lustre_msg_buf(req->rq_reqmsg, 1);
         LOGL0(name, namelen, tmp);
 
-        if (tgt) {
+        if (S_ISREG(mode)) {
+                tmp = lustre_msg_buf(req->rq_reqmsg, 2);
+                memcpy(tmp, obdo, sizeof(*obdo));
+        } else if (S_ISLNK(mode)) {
                 tmp = lustre_msg_buf(req->rq_reqmsg, 2);
                 LOGL0(tgt, tgtlen, tmp);
         }
index 3a7fc63..a928fc3 100644 (file)
@@ -95,7 +95,10 @@ int mdc_getattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         ll_ino2fid(&body->fid1, ino, 0, type);
         body->valid = valid;
 
-        if (valid & OBD_MD_LINKNAME) {
+        if (S_ISREG(type)) {
+                bufcount = 2;
+                size[1] = sizeof(struct obdo);
+        } else if (valid & OBD_MD_LINKNAME) {
                 bufcount = 2;
                 size[1] = ea_size;
         }
@@ -134,7 +137,7 @@ int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         body = lustre_msg_buf(req->rq_reqmsg, 0);
         ll_ino2fid(&body->fid1, ino, 0, type);
         body->flags = HTON__u32(flags);
-        body->objid = cookie; 
+        body->extra = cookie;
 
         req->rq_replen = lustre_msg_size(1, &size);
 
@@ -144,7 +147,7 @@ int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         if (!rc) {
                 mds_unpack_rep_body(req);
                 body = lustre_msg_buf(req->rq_repmsg, 0);
-                *fh = body->objid;
+                *fh = body->extra;
         }
 
         EXIT;
@@ -166,7 +169,7 @@ int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
         ll_ino2fid(&body->fid1, ino, 0, type);
-        body->objid = fh;
+        body->extra = fh;
 
         req->rq_level = LUSTRE_CONN_FULL;
         req->rq_replen = lustre_msg_size(0, NULL);
@@ -347,7 +350,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
                 err = mdc_create(&cl, conn, &inode,
                                  "foofile", strlen("foofile"),
                                  NULL, 0, 0100707, 47114711,
-                                 11, 47, 0, &request);
+                                 11, 47, 0, NULL, &request);
                 CERROR("-- done err %d\n", err);
 
                 GOTO(out, err);
index 8c94e54..887039c 100644 (file)
@@ -6,8 +6,8 @@
 DEFS:= 
 
 MODULE = mds
-modulefs_DATA = mds.o mds_ext2.o mds_ext3.o mds_extN.o
-EXTRA_PROGRAMS = mds mds_ext2 mds_ext3 mds_extN
+modulefs_DATA = mds.o mds_extN.o # mds_ext2.o mds_ext3.o
+EXTRA_PROGRAMS = mds mds_extN # mds_ext2 mds_ext3
 
 LINX=mds_updates.c simple.c
 mds_updates.c: 
index 6f54693..ab4f48a 100644 (file)
@@ -22,8 +22,8 @@
 #include <linux/module.h>
 #include <linux/lustre_mds.h>
 
-static
-int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
+static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
+                        __u64 offset)
 {
         int rc = 0;
         mm_segment_t oldfs = get_fs();
@@ -58,6 +58,9 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
         desc->b_portal = MDS_BULK_PORTAL;
 
         rc = ptlrpc_send_bulk(desc);
+        if (rc)
+                GOTO(cleanup_buf, rc);
+
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
                        OBD_FAIL_MDS_SENDPAGE, rc);
@@ -65,6 +68,10 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
                 GOTO(cleanup_buf, rc);
         }
 
+        wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                GOTO(cleanup_buf, rc = -EINTR);
+
         EXIT;
  cleanup_buf:
         OBD_FREE(buf, PAGE_SIZE);
@@ -193,14 +200,13 @@ int mds_connect(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static
-int mds_getattr(struct ptlrpc_request *req)
+static int mds_getattr(struct ptlrpc_request *req)
 {
         struct dentry *de;
         struct inode *inode;
         struct mds_body *body;
         struct mds_obd *mds = &req->rq_obd->u.mds;
-        int rc, size[2] = {sizeof(*body)}, count = 1;
+        int rc, size[2] = {sizeof(*body)}, bufcount = 1;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
@@ -209,13 +215,17 @@ int mds_getattr(struct ptlrpc_request *req)
                 req->rq_status = -ENOENT;
                 RETURN(0);
         }
+
         inode = de->d_inode;
-        if (body->valid & OBD_MD_LINKNAME) {
-                count = 2;
+        if (S_ISREG(body->fid1.f_type)) {
+                bufcount = 2;
+                size[1] = sizeof(struct obdo);
+        } else if (body->valid & OBD_MD_LINKNAME) {
+                bufcount = 2;
                 size[1] = inode->i_size;
         }
 
-        rc = lustre_pack_msg(count, size, NULL, &req->rq_replen,
+        rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
                              &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
                 CERROR("mds: out of memory\n");
@@ -250,15 +260,17 @@ int mds_getattr(struct ptlrpc_request *req)
         body->size = inode->i_size;
         body->mode = inode->i_mode;
         body->nlink = inode->i_nlink;
+        body->valid = ~0; /* FIXME: should be more selective */
+
         if (S_ISREG(inode->i_mode)) {
-                rc = mds_fs_get_objid(mds, inode, &body->objid);
+                rc = mds_fs_get_obdo(mds, inode,
+                                     lustre_msg_buf(req->rq_repmsg, 1));
                 if (rc < 0) {
                         req->rq_status = rc;
-                        CERROR("readlink failed: %d\n", rc);
+                        CERROR("mds_fs_get_objid failed: %d\n", rc);
                         GOTO(out, 0);
                 }
         }
-        body->valid = ~0; /* FIXME: should be more selective */
  out:
         l_dput(de);
         RETURN(0);
@@ -301,7 +313,7 @@ int mds_open(struct ptlrpc_request *req)
         list_for_each(tmp, &mci->mci_open_head) { 
                 struct mds_file_data *fd;
                 fd = list_entry(tmp, struct mds_file_data, mfd_list);
-                if (body->objid == fd->mfd_clientfd && 
+                if (body->extra == fd->mfd_clientfd && 
                     body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) { 
                         CERROR("Re opening %Ld\n", body->fid1.id);
                         RETURN(0);
@@ -331,11 +343,11 @@ int mds_open(struct ptlrpc_request *req)
 
         file->private_data = mfd;
         mfd->mfd_file = file;
-        mfd->mfd_clientfd = body->objid;
+        mfd->mfd_clientfd = body->extra;
         list_add(&mfd->mfd_list, &mci->mci_open_head); 
 
         body = lustre_msg_buf(req->rq_repmsg, 0);
-        body->objid = (__u64) (unsigned long)file;
+        body->extra = (__u64) (unsigned long)file;
         RETURN(0);
 }
 
@@ -364,7 +376,7 @@ int mds_close(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        file = (struct file *)(unsigned long)body->objid;
+        file = (struct file *)(unsigned long)body->extra;
         if (!file->f_dentry) 
                 LBUG();
         mfd = (struct mds_file_data *)file->private_data;
index 714943d..be7587e 100644 (file)
@@ -22,6 +22,7 @@
 #include <linux/extN_xattr.h>
 #include <linux/lustre_mds.h>
 #include <linux/module.h>
+#include <linux/obd_lov.h>
 
 static struct mds_fs_operations mds_extN_fs_ops;
 static kmem_cache_t *jcb_cache;
@@ -34,10 +35,8 @@ struct mds_cb_data {
 };
 
 struct mds_objid {
-        __u16 mo_magic;
-        __u8  mo_unused;
-        __u8  mo_ost;
-        __u64 mo_id;
+        __u64 mo_magic;
+        struct lov_md mo_lov_md;
 };
 
 #define EXTN_XATTR_INDEX_LUSTRE         5
@@ -103,65 +102,75 @@ static int mds_extN_setattr(struct dentry *dentry, void *handle,
                 return inode_setattr(inode, iattr);
 }
 
-static int mds_extN_set_objid(struct inode *inode, void *handle, obd_id id)
+static int mds_extN_set_obdo(struct inode *inode, void *handle,
+                             struct obdo *obdo)
 {
-        struct mds_objid data;
+        struct mds_objid *data = (struct mds_objid *)obdo->o_inline;
         int rc;
 
-        data.mo_magic = cpu_to_le16(XATTR_MDS_MO_MAGIC);
-        data.mo_unused = 0;
-        data.mo_ost = 0;  /* FIXME: store OST index here */
-        data.mo_id = cpu_to_le64(id);
+        data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC);
 
         lock_kernel();
         down(&inode->i_sem);
-        if (id == 0) {
+        if (obdo == NULL)
                 rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
                                     XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0);
-        } else {
+        else
                 rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
-                                    XATTR_LUSTRE_MDS_OBJID, &data,
-                                    sizeof(struct mds_objid), XATTR_CREATE);
-        }
+                                    XATTR_LUSTRE_MDS_OBJID, obdo->o_inline,
+                                    OBD_INLINESZ, XATTR_CREATE);
         up(&inode->i_sem);
         unlock_kernel();
 
         if (rc)
                 CERROR("error adding objectid %Ld to inode %ld\n",
-                       (unsigned long long)id, inode->i_ino);
+                       (unsigned long long)obdo->o_id, inode->i_ino);
         return rc;
 }
 
-static int mds_extN_get_objid(struct inode *inode, obd_id *id)
+static int mds_extN_get_obdo(struct inode *inode, struct obdo *obdo)
 {
+        char *buf;
         struct mds_objid data;
-        int rc;
+        struct lov_object_id *lov_ids;
+        int rc, size;
 
         lock_kernel();
         down(&inode->i_sem);
         rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE,
                             XATTR_LUSTRE_MDS_OBJID, &data,
                             sizeof(struct mds_objid));
+        size = sizeof(struct mds_objid) + data.mo_lov_md.lmd_stripe_count *
+                sizeof(*lov_ids);
+        OBD_ALLOC(buf, size);
+        if (buf == NULL)
+                LBUG();
+        rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE,
+                            XATTR_LUSTRE_MDS_OBJID, buf, size);
         up(&inode->i_sem);
         unlock_kernel();
 
+        if (size > OBD_INLINESZ)
+                LBUG();
+
         if (rc < 0) {
                 CERROR("error getting EA %s from MDS inode %ld: rc = %d\n",
                        XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc);
-                *id = 0;
-        } else if (data.mo_magic != cpu_to_le16(XATTR_MDS_MO_MAGIC)) {
-                CERROR("MDS object id %Ld has bad magic %x\n",
-                       (unsigned long long)le64_to_cpu(data.mo_id),
-                       le16_to_cpu(data.mo_magic));
+                obdo->o_id = 0;
+        } else if (data.mo_magic != cpu_to_le64(XATTR_MDS_MO_MAGIC)) {
+                CERROR("MDS object id %Ld has bad magic %Lx\n",
+                       (unsigned long long)obdo->o_id,
+                       (unsigned long long)le64_to_cpu(data.mo_magic));
                 rc = -EINVAL;
         } else {
-                *id = le64_to_cpu(data.mo_id);
-                /* FIXME: will actually use data.mo_ost at some point */
-                if (data.mo_ost)
-                        CERROR("MDS objid %Ld with ost index %d!\n",
-                               *id, data.mo_ost);
+                /* This field is byteswapped because it appears in the
+                 * catalogue.  All others are opaque to the MDS */
+                obdo->o_id = le64_to_cpu(data.mo_lov_md.lmd_object_id);
+                memcpy(obdo->o_inline, buf + sizeof(data), size);
         }
 
+#warning FIXME: pass this buffer to caller for transmission when size exceeds OBD_INLINESZ
+        OBD_FREE(buf, size);
         return rc;
 }
 
@@ -202,8 +211,8 @@ static void mds_extN_delete_inode(struct inode *inode)
                         EXIT;
                         return;
                 }
-                if (mds_extN_set_objid(inode, handle, 0))
-                        CERROR("error clearing objid on %ld\n", inode->i_ino);
+                if (mds_extN_set_obdo(inode, handle, NULL))
+                        CERROR("error clearing obdo on %ld\n", inode->i_ino);
 
                 if (mds_extN_fs_ops.cl_delete_inode)
                         mds_extN_fs_ops.cl_delete_inode(inode);
@@ -286,8 +295,8 @@ static struct mds_fs_operations mds_extN_fs_ops = {
         fs_start:               mds_extN_start,
         fs_commit:              mds_extN_commit,
         fs_setattr:             mds_extN_setattr,
-        fs_set_objid:           mds_extN_set_objid,
-        fs_get_objid:           mds_extN_get_objid,
+        fs_set_obdo:            mds_extN_set_obdo,
+        fs_get_obdo:            mds_extN_get_obdo,
         fs_readpage:            mds_extN_readpage,
         fs_delete_inode:        mds_extN_delete_inode,
         cl_delete_inode:        clear_inode,
index d8edcb0..f0e7216 100644 (file)
 // XXX - add transaction sequence numbers
 
 #define EXPORT_SYMTAB
-
-#include <linux/version.h>
-#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-#include <asm/uaccess.h>
-
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/obd_support.h>
@@ -261,7 +251,7 @@ static int mds_reint_create(struct mds_update_record *rec,
         case S_IFBLK:
         case S_IFIFO:
         case S_IFSOCK: {
-                int rdev = rec->ur_id;
+                int rdev = rec->ur_rdev;
                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
                 if (!handle)
                         GOTO(out_create_dchild, PTR_ERR(handle));
@@ -285,7 +275,9 @@ static int mds_reint_create(struct mds_update_record *rec,
 
                 CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
                 if (type == S_IFREG) {
-                        rc = mds_fs_set_objid(mds, inode, handle, rec->ur_id);
+                        struct obdo *obdo;
+                        obdo = lustre_msg_buf(req->rq_reqmsg, 2);
+                        rc = mds_fs_set_obdo(mds, inode, handle, obdo);
                         if (rc)
                                 CERROR("error %d setting objid for %ld\n",
                                        rc, inode->i_ino);
@@ -526,7 +518,8 @@ static int mds_reint_rename(struct mds_update_record *rec,
         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
         if (!handle)
                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
-        rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
+        rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
+                        NULL);
 
         if (!rc)
                 rc = mds_update_last_rcvd(mds, handle, req);
index 2c215ad..178e74a 100644 (file)
@@ -511,7 +511,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                 }
 
                 err = obd_brw(rw, conns, num, obdos, oa_bufs, bufs,
-                              counts, offsets, flags);
+                              counts, offsets, flags, NULL);
 
                 EXIT;
         brw_cleanup:
index 35ee6b8..87122ab 100644 (file)
@@ -238,7 +238,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst,
                 page->index = index;
                 err = OBP(src_conn->oc_dev, brw)(READ, src_conn, num_oa, &src,
                                                  &num_buf, &page, &brw_count,
-                                                 &brw_offset, &flagr);
+                                                 &brw_offset, &flagr, NULL);
 
                 if ( err ) {
                         EXIT;
@@ -248,7 +248,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst,
 
                 err = OBP(dst_conn->oc_dev, brw)(WRITE, dst_conn, num_oa, &dst,
                                                  &num_buf, &page, &brw_count,
-                                                 &brw_offset, &flagw);
+                                                 &brw_offset, &flagw, NULL);
 
                 /* XXX should handle dst->o_size, dst->o_blocks here */
                 if ( err ) {
index 0aa5381..863f96b 100644 (file)
@@ -690,6 +690,7 @@ static int filter_destroy(struct obd_conn *conn, struct obdo *oa)
         inode->i_mode = S_IFREG;
 
         push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
+
         rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
         pop_ctxt(&saved);
         CDEBUG(D_INODE, "put child %p, count = %d\n", object_dentry,
@@ -792,7 +793,7 @@ static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf,
 static int filter_pgcache_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                                struct obdo **oa, obd_count *oa_bufs,
                                struct page **pages, obd_size *count,
-                               obd_off *offset, obd_flag *flags)
+                               obd_off *offset, obd_flag *flags, void *callback)
 {
         struct obd_run_ctxt      saved;
         struct super_block      *sb;
index ba43795..34f65e4 100644 (file)
@@ -169,7 +169,7 @@ static int obdfs_brw(int rw, struct inode *inode, struct page *page, int create)
         obdfs_from_inode(oa, inode);
 
         err = obd_brw(rw, IID(inode), num_obdo, &oa, &bufs_per_obdo,
-                       &page, &count, &offset, &flags);
+                       &page, &count, &offset, &flags, NULL);
         //if ( !err )
         //      obdfs_to_inode(inode, oa); /* copy o_blocks to i_blocks */
 
@@ -205,7 +205,7 @@ static int obdfs_commit_page(struct page *page, int create, int from, int to)
                from, to, (unsigned long long)count);
 
         err = obd_brw(WRITE, IID(inode), num_obdo, &oa, &bufs_per_obdo,
-                               &page, &count, &offset, &flags);
+                               &page, &count, &offset, &flags, NULL);
         if ( !err ) {
                 SetPageUptodate(page);
                 set_page_clean(page);
index fe83d9b..444ff05 100644 (file)
@@ -82,7 +82,8 @@ static int osc_disconnect(struct obd_conn *conn)
         ENTRY;
 
         osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
+        request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -360,26 +361,32 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
         return 0;
 }
 
-static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
-                        struct niobuf_remote *dst, struct niobuf_local *src)
+struct osc_brw_cb_data {
+        struct page **buf;
+        struct ptlrpc_request *req;
+        bulk_callback_t callback;
+        void *cb_data;
+};
+
+static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
 {
-        struct ptlrpc_bulk_page *page;
-        ENTRY;
+        struct osc_brw_cb_data *cb_data = data;
 
-        page = ptlrpc_prep_bulk_page(desc);
-        if (page == NULL)
-                RETURN(-ENOMEM);
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                CERROR("got signal\n");
 
-        page->b_buf = (void *)(unsigned long)src->addr;
-        page->b_buflen = src->len;
-        page->b_xid = dst->xid;
+        (cb_data->callback)(desc, cb_data->cb_data);
 
-        RETURN(0);
+        ptlrpc_free_bulk(desc);
+        ptlrpc_free_req(cb_data->req);
+
+        OBD_FREE(cb_data, sizeof(*cb_data));
 }
 
 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-                        obd_size *count, obd_off *offset, obd_flag *flags)
+                        obd_size *count, obd_off *offset, obd_flag *flags,
+                        bulk_callback_t callback)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
@@ -414,6 +421,7 @@ static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
         for (pages = 0, i = 0; i < num_oa; i++) {
                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+                /* FIXME: this inner loop is wrong for multiple OAs */
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
                         struct ptlrpc_bulk_page *bulk;
                         bulk = ptlrpc_prep_bulk_page(desc);
@@ -457,10 +465,30 @@ static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
         return rc;
 }
 
+static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
+{
+        struct osc_brw_cb_data *cb_data = data;
+        int i;
+
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                CERROR("got signal\n");
+
+        for (i = 0; i < desc->b_page_count; i++)
+                kunmap(cb_data->buf[i]);
+
+        (cb_data->callback)(desc, cb_data->cb_data);
+
+        ptlrpc_free_bulk(desc);
+        ptlrpc_free_req(cb_data->req);
+
+        OBD_FREE(cb_data, sizeof(*cb_data));
+}
+
 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
                          struct obdo **oa, obd_count *oa_bufs,
-                         struct page **pagearray, obd_size *count, obd_off *offset,
-                         obd_flag *flags)
+                         struct page **pagearray, obd_size *count,
+                         obd_off *offset, obd_flag *flags,
+                         bulk_callback_t callback)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
@@ -470,6 +498,7 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
         struct ost_body *body;
         struct niobuf_local *local;
         struct niobuf_remote *remote;
+        struct osc_brw_cb_data *cb_data;
         long pages;
         int rc, i, j, size[3] = {sizeof(*body)};
         void *ptr1, *ptr2;
@@ -527,17 +556,44 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
 
         desc = ptlrpc_prep_bulk(connection);
         desc->b_portal = OSC_BULK_PORTAL;
+        if (callback) {
+                desc->b_cb = brw_write_finish;
+                OBD_ALLOC(cb_data, sizeof(*cb_data));
+                cb_data->buf = pagearray;
+                cb_data->callback = callback;
+                desc->b_cb_data = cb_data;
+        }
 
         for (pages = 0, i = 0; i < num_oa; i++) {
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
+                        struct ptlrpc_bulk_page *page;
+
                         ost_unpack_niobuf(&ptr2, &remote);
-                        rc = osc_sendpage(desc, remote, &local[pages]);
-                        if (rc)
-                                GOTO(out3, rc);
+
+                        page = ptlrpc_prep_bulk_page(desc);
+                        if (page == NULL)
+                                GOTO(out3, rc = -ENOMEM);
+
+                        page->b_buf = (void *)(unsigned long)local[pages].addr;
+                        page->b_buflen = local[pages].len;
+                        page->b_xid = remote->xid;
                 }
         }
 
+        if (desc->b_page_count != pages)
+                LBUG();
+
         rc = ptlrpc_send_bulk(desc);
+        if (rc)
+                GOTO(out3, rc);
+        if (callback)
+                GOTO(out, rc);
+
+        /* If there's no callback function, sleep here until complete. */
+        wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                rc = -EINTR;
+
         GOTO(out3, rc);
 
  out3:
@@ -555,14 +611,18 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
 
 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-                   obd_size *count, obd_off *offset, obd_flag *flags)
+                   obd_size *count, obd_off *offset, obd_flag *flags,
+                   void *callback)
 {
+        if (num_oa != 1)
+                LBUG();
+
         if (rw == OBD_BRW_READ)
                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
-                                    offset, flags);
+                                    offset, flags, (bulk_callback_t)callback);
         else
                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
-                                     offset, flags);
+                                     offset, flags, (bulk_callback_t)callback);
 }
 
 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
index 1276c8b..bb5a214 100644 (file)
@@ -356,11 +356,9 @@ static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
         RETURN(rc);
 }
 
-static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
+static void ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc, void *data)
 {
         ptlrpc_free_bulk(desc);
-
-        return 0;
 }
 
 static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
index 19ea660..e1a1726 100644 (file)
@@ -32,7 +32,7 @@ static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL;
 /*
  *  Free the packet when it has gone out
  */
-static int request_out_callback(ptl_event_t *ev, void *data)
+static int request_out_callback(ptl_event_t *ev)
 {
         ENTRY;
 
@@ -49,7 +49,7 @@ static int request_out_callback(ptl_event_t *ev, void *data)
 /*
  *  Free the packet when it has gone out
  */
-static int reply_out_callback(ptl_event_t *ev, void *data)
+static int reply_out_callback(ptl_event_t *ev)
 {
         ENTRY;
 
@@ -67,7 +67,7 @@ static int reply_out_callback(ptl_event_t *ev, void *data)
 /*
  * Wake up the thread waiting for the reply once it comes in.
  */
-static int reply_in_callback(ptl_event_t *ev, void *data)
+static int reply_in_callback(ptl_event_t *ev)
 {
         struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
         ENTRY;
@@ -85,9 +85,9 @@ static int reply_in_callback(ptl_event_t *ev, void *data)
         RETURN(1);
 }
 
-int request_in_callback(ptl_event_t *ev, void *data)
+int request_in_callback(ptl_event_t *ev)
 {
-        struct ptlrpc_service *service = data;
+        struct ptlrpc_service *service = ev->mem_desc.user_ptr;
         int index;
 
         if (ev->rlength != ev->mlength)
@@ -130,7 +130,7 @@ int request_in_callback(ptl_event_t *ev, void *data)
         return 0;
 }
 
-static int bulk_source_callback(ptl_event_t *ev, void *data)
+static int bulk_source_callback(ptl_event_t *ev)
 {
         struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
         struct ptlrpc_bulk_desc *desc = bulk->b_desc;
@@ -140,8 +140,14 @@ static int bulk_source_callback(ptl_event_t *ev, void *data)
                 CDEBUG(D_NET, "got SENT event\n");
         } else if (ev->type == PTL_EVENT_ACK) {
                 CDEBUG(D_NET, "got ACK event\n");
-                desc->b_flags |= PTL_BULK_FL_SENT;
-                wake_up_interruptible(&desc->b_waitq);
+                if (bulk->b_cb != NULL)
+                        bulk->b_cb(bulk);
+                if (atomic_dec_and_test(&desc->b_finished_count)) {
+                        desc->b_flags |= PTL_BULK_FL_SENT;
+                        wake_up_interruptible(&desc->b_waitq);
+                        if (desc->b_cb != NULL)
+                                desc->b_cb(desc, desc->b_cb_data);
+                }
         } else {
                 CERROR("Unexpected event type!\n");
                 LBUG();
@@ -150,7 +156,7 @@ static int bulk_source_callback(ptl_event_t *ev, void *data)
         RETURN(1);
 }
 
-static int bulk_sink_callback(ptl_event_t *ev, void *data)
+static int bulk_sink_callback(ptl_event_t *ev)
 {
         struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
         struct ptlrpc_bulk_desc *desc = bulk->b_desc;
@@ -159,14 +165,13 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data)
         if (ev->type == PTL_EVENT_PUT) {
                 if (bulk->b_buf != ev->mem_desc.start + ev->offset)
                         CERROR("bulkbuf != mem_desc -- why?\n");
-                desc->b_finished_count++;
                 if (bulk->b_cb != NULL)
                         bulk->b_cb(bulk);
-                if (desc->b_finished_count == desc->b_page_count) {
+                if (atomic_dec_and_test(&desc->b_finished_count)) {
                         desc->b_flags |= PTL_BULK_FL_RCVD;
                         wake_up_interruptible(&desc->b_waitq);
                         if (desc->b_cb != NULL)
-                                desc->b_cb(desc);
+                                desc->b_cb(desc, desc->b_cb_data);
                 }
         } else {
                 CERROR("Unexpected event type!\n");
@@ -194,23 +199,23 @@ int ptlrpc_init_portals(void)
         else
                 ni = *socknal_nip;
 
-        rc = PtlEQAlloc(ni, 128, request_out_callback, NULL, &request_out_eq);
+        rc = PtlEQAlloc(ni, 128, request_out_callback, &request_out_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
-        rc = PtlEQAlloc(ni, 128, reply_out_callback, NULL, &reply_out_eq);
+        rc = PtlEQAlloc(ni, 128, reply_out_callback, &reply_out_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
-        rc = PtlEQAlloc(ni, 128, reply_in_callback, NULL, &reply_in_eq);
+        rc = PtlEQAlloc(ni, 128, reply_in_callback, &reply_in_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
-        rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq);
+        rc = PtlEQAlloc(ni, 128, bulk_source_callback, &bulk_source_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
-        rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq);
+        rc = PtlEQAlloc(ni, 128, bulk_sink_callback, &bulk_sink_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
index 584cbf9..1157920 100644 (file)
@@ -28,7 +28,7 @@ extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
         bulk_source_eq, bulk_sink_eq;
 static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY};
 
-static int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
 {
         ENTRY;
 
@@ -109,6 +109,8 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
         ptl_process_id_t remote_id;
         ENTRY;
 
+        atomic_set(&desc->b_finished_count, desc->b_page_count);
+
         list_for_each_safe(tmp, next, &desc->b_page_list) {
                 /* only request an ACK for the last page */
                 int ack = (next == &desc->b_page_list);
@@ -139,19 +141,14 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
                 rc = PtlPut(bulk->b_md_h, (ack ? PTL_ACK_REQ : PTL_NOACK_REQ),
                             remote_id, desc->b_portal, 0, bulk->b_xid, 0, 0);
                 if (rc != PTL_OK) {
-                        CERROR("PtlPut(%Lu, %d, %d) failed: %d\n", remote_id.nid,
-                               desc->b_portal, bulk->b_xid, rc);
+                        CERROR("PtlPut(%Lu, %d, %d) failed: %d\n",
+                               remote_id.nid, desc->b_portal, bulk->b_xid, rc);
                         PtlMDUnlink(bulk->b_md_h);
                         LBUG();
                         RETURN(rc);
                 }
         }
 
-        wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                RETURN(-EINTR);
-
         RETURN(0);
 }
 
@@ -254,7 +251,6 @@ int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
 
 int ptl_send_rpc(struct ptlrpc_request *request)
 {
-        ptl_process_id_t local_id;
         int rc;
         char *repbuf;
 
@@ -282,9 +278,6 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                 RETURN(ENOMEM);
         }
 
-        local_id.nid = PTL_ID_ANY;
-        local_id.pid = PTL_ID_ANY;
-
         down(&request->rq_client->cli_rpc_sem);
 
         rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
index f87c484..97e683a 100644 (file)
@@ -24,7 +24,7 @@
 
 #include <linux/lustre_net.h>
 
-extern int request_in_callback(ptl_event_t *ev, void *data);
+extern int request_in_callback(ptl_event_t *ev);
 extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
 
 static int ptlrpc_check_event(struct ptlrpc_service *svc,
@@ -101,7 +101,7 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
         service->srv_ring_length = RPC_RING_LENGTH;
 
         rc = PtlEQAlloc(service->srv_self.peer_ni, 128, request_in_callback,
-                        service, &(service->srv_eq_h));
+                        &(service->srv_eq_h));
 
         if (rc != PTL_OK) {
                 CERROR("PtlEQAlloc failed: %d\n", rc);