Whamcloud - gitweb
- change I/O to use a pagearray
authorbraam <braam>
Sun, 11 Aug 2002 08:15:46 +0000 (08:15 +0000)
committerbraam <braam>
Sun, 11 Aug 2002 08:15:46 +0000 (08:15 +0000)
- implement remaining striping function in LOV:
  - read/write
  - locking
  - truncate
- minor protocol cleanup for MDS
- change documentation to include design / architecture / manual /
  appendix parts
- add design documents:
   - managmennt api
   - network format
-

24 files changed:
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/lib/l_net.c
lustre/lib/mds_updates.c
lustre/lib/page.c
lustre/llite/file.c
lustre/llite/recover.c
lustre/llite/rw.c
lustre/llite/super.c
lustre/lov/lov_obd.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/obdclass/Makefile.am
lustre/obdclass/class_obd.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/pack_generic.c
lustre/utils/lctl.c

index 5fa1c0b..03fa6da 100644 (file)
@@ -117,7 +117,7 @@ struct lustre_msg {
         __u32 type;
         __u32 version;
         __u32 opc;
-        __u64 last_rcvd;
+        __u64 last_xid;
         __u64 last_committed;
         __u64 transno;
         __u32 status;
@@ -321,6 +321,11 @@ struct mds_status_req {
         __u32  repbuf;
 };
 
+struct mds_fileh_body { 
+        struct ll_fid f_fid;
+        struct lustre_handle f_handle;
+};
+
 struct mds_conn_status { 
         struct ll_fid rootfid;
         __u64          xid;
@@ -347,13 +352,15 @@ struct mds_body {
         __u32          ino;
         __u32          nlink;
         __u32          generation;
-        __u32          last_xid;
+        __u32          last_xidnomore;
 };
 
 /* MDS update records */
-struct mds_update_record_hdr {
-        __u32 ur_opcode;
-};
+
+
+//struct mds_update_record_hdr {
+//        __u32 ur_opcode;
+//};
 
 struct mds_rec_setattr {
         __u32           sa_opcode;
index 4d053cd..4e0f03f 100644 (file)
@@ -60,6 +60,16 @@ void l_unlock(struct lustre_lock *);
 
 
 /* page.c */
+#define CB_PHASE_START   12
+#define CB_PHASE_FINISH  13
+struct io_cb_data {
+        wait_queue_head_t waitq;
+        atomic_t refcount;
+        int complete;
+        int err;
+};
+int ll_sync_io_cb(void *data, int err, int phase);
+struct  io_cb_data *ll_init_cb(void);
 inline void lustre_put_page(struct page *page);
 struct page *lustre_get_page_read(struct inode *dir, unsigned long index);
 struct page *lustre_get_page_write(struct inode *dir, unsigned long index);
@@ -408,8 +418,8 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg)
  sigismember(&(task->pending.signal), SIGTERM))
 
 /*
- * Like wait_event_interruptible, but we're only interruptible by KILL, INT, or
- * TERM.
+ * Like wait_event_interruptible, but we're only interruptible by
+ * KILL, INT, or TERM.
  *
  * XXXshaver These are going away soon, I hope.
  */
index 57da73e..5bca561 100644 (file)
@@ -148,8 +148,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
 int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
                    uuid_t **uuids, struct ptlrpc_request **request);
 int mdc_getstatus(struct lustre_handle *conn,
-                struct ll_fid *rootfid, __u64 *last_committed, __u64 *last_rcvd,
-                __u32 *last_xid, struct ptlrpc_request **);
+                struct ll_fid *rootfid, __u64 *last_committed,                __u32 *last_xid, struct ptlrpc_request **);
 int mdc_getattr(struct lustre_handle *conn,
                 obd_id ino, int type, unsigned long valid, size_t ea_size,
                 struct ptlrpc_request **request);
index fffe5a4..0c50c99 100644 (file)
@@ -58,7 +58,7 @@ struct ptlrpc_client {
         __u32 cli_request_portal;
         __u32 cli_reply_portal;
 
-        __u64 cli_last_rcvd;
+        __u64 cli_last_xid;
         __u64 cli_last_committed;
         __u32 cli_target_devno;
 
index e94f8b8..6156266 100644 (file)
@@ -23,6 +23,13 @@ struct obd_type {
         int  typ_refcnt;
 };
 
+typedef int (*brw_callback_t)(void *, int err, int phase);
+struct brw_page { 
+        struct page *pg;
+        obd_size count;
+        obd_off  off;
+        obd_flag flag;
+};
 
 /* Individual type definitions */
 
@@ -69,15 +76,6 @@ struct client_obd {
         int cl_max_mdsize;
 };
 
-#if 0
-struct osc_obd {
-        struct ptlrpc_client *osc_client;
-        struct ptlrpc_client *osc_ldlm_client;
-        struct ptlrpc_connection *osc_conn;
-        __u8 osc_target_uuid[37];
-};
-#endif
-
 struct mds_obd {
         struct ptlrpc_service *mds_service;
 
@@ -208,7 +206,6 @@ struct obd_device {
         } u;
 };
 
-typedef void (*brw_callback_t)(void *);
 
 struct obd_ops {
         int (*o_iocontrol)(long cmd, struct lustre_handle *, int len,
@@ -243,8 +240,8 @@ struct obd_ops {
                        struct lov_stripe_md *);
         int (*o_brw)(int rw, struct lustre_handle *conn,
                      struct lov_stripe_md *md, obd_count oa_bufs,
-                     struct page **buf, obd_size *count, obd_off *offset,
-                     obd_flag *flags, brw_callback_t callback, void * data);
+                     struct brw_page *pgarr, brw_callback_t callback, 
+                     void * data);
         int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt,
                        struct lov_stripe_md *md, obd_size count,
                        obd_off offset);
@@ -266,13 +263,12 @@ struct obd_ops {
                           int objcount, struct obd_ioobj *obj,
                           int niocount, struct niobuf_local *local,
                           void *desc_private);
-        int (*o_enqueue)(struct lustre_handle *conn,
-                         struct lustre_handle *parent_lock, __u64 *res_id,
+        int (*o_enqueue)(struct lustre_handle *conn, struct lov_stripe_md *md,
+                         struct lustre_handle *parent_lock, 
                          __u32 type, void *cookie, int cookielen, __u32 mode,
                          int *flags, void *cb, void *data, int datalen,
                          struct lustre_handle *lockh);
-        int (*o_cancel)(struct lustre_handle *, __u32 mode,
-                        struct lustre_handle *);
+        int (*o_cancel)(struct lustre_handle *, struct lov_stripe_md *md, __u32 mode, struct lustre_handle *);
 };
 
 #endif
index 2298cd6..bcd8c3b 100644 (file)
@@ -342,10 +342,7 @@ static inline int obd_punch(struct lustre_handle *conn, struct obdo *tgt,
 static inline int obd_brw(int cmd, struct lustre_handle *conn, 
                           struct lov_stripe_md *md, 
                           obd_count oa_bufs,
-                          struct page **buf, 
-                          obd_size *count, 
-                          obd_off *offset,
-                          obd_flag *flags, 
+                          struct brw_page *pg, 
                           brw_callback_t callback, void *data)
 {
         int rc;
@@ -358,8 +355,7 @@ static inline int obd_brw(int cmd, struct lustre_handle *conn,
                 LBUG();
         }
 
-        rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, buf,
-                                       count, offset, flags, callback, data);
+        rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, pg, callback, data);
         RETURN(rc);
 }
 
@@ -406,7 +402,8 @@ static inline int obd_iocontrol(int cmd, struct lustre_handle *conn,
 }
 
 static inline int obd_enqueue(struct lustre_handle *conn,
-                              struct lustre_handle *parent_lock, __u64 *res_id,
+                              struct lov_stripe_md *md,
+                              struct lustre_handle *parent_lock, 
                               __u32 type, void *cookie, int cookielen,
                               __u32 mode, int *flags, void *cb, void *data,
                               int datalen, struct lustre_handle *lockh)
@@ -416,13 +413,13 @@ static inline int obd_enqueue(struct lustre_handle *conn,
         OBD_CHECK_SETUP(conn, export);
         OBD_CHECK_OP(export->exp_obd,enqueue);
 
-        rc = OBP(export->exp_obd, enqueue)(conn, parent_lock, res_id, type,
+        rc = OBP(export->exp_obd, enqueue)(conn, md, parent_lock, type,
                                         cookie, cookielen, mode, flags, cb,
                                         data, datalen, lockh);
         RETURN(rc);
 }
 
-static inline int obd_cancel(struct lustre_handle *conn, __u32 mode,
+static inline int obd_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, __u32 mode,
                              struct lustre_handle *lockh)
 {
         int rc;
@@ -430,7 +427,7 @@ static inline int obd_cancel(struct lustre_handle *conn, __u32 mode,
         OBD_CHECK_SETUP(conn, export);
         OBD_CHECK_OP(export->exp_obd,cancel);
 
-        rc = OBP(export->exp_obd, cancel)(conn, mode, lockh);
+        rc = OBP(export->exp_obd, cancel)(conn, md, mode, lockh);
         RETURN(rc);
 }
 
index 1ef0ad9..a1a687c 100644 (file)
@@ -274,7 +274,8 @@ int target_handle_connect(struct ptlrpc_request *req)
         conn.addr = req->rq_reqmsg->addr;
         conn.cookie = req->rq_reqmsg->cookie;
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_msg(0, 
+                             NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 GOTO(out, rc);
 
index 2fb2d3e..3cf6dbf 100644 (file)
@@ -94,7 +94,6 @@ static void mds_pack_body(struct mds_body *b)
         b->ino = HTON__u32(b->ino);
         b->nlink = HTON__u32(b->nlink);
         b->generation = HTON__u32(b->generation);
-        b->last_xid = HTON__u32(b->last_xid);
 }
 
 void mds_getattr_pack(struct ptlrpc_request *req, int offset,
@@ -268,7 +267,6 @@ void mds_unpack_body(struct mds_body *b)
         b->ino = NTOH__u32(b->ino);
         b->nlink = NTOH__u32(b->nlink);
         b->generation = NTOH__u32(b->generation);
-        b->last_xid = NTOH__u32(b->last_xid);
 }
 
 static int mds_setattr_unpack(struct ptlrpc_request *req, int offset,
@@ -399,15 +397,14 @@ static update_unpacker mds_unpackers[REINT_MAX + 1] = {
 int mds_update_unpack(struct ptlrpc_request *req, int offset,
                       struct mds_update_record *rec)
 {
-        struct mds_update_record_hdr *hdr =
-                lustre_msg_buf(req->rq_reqmsg, offset);
+        __u32 *opcode = lustre_msg_buf(req->rq_reqmsg, offset);
         int rc;
         ENTRY;
 
-        if (!hdr || req->rq_reqmsg->buflens[offset] < sizeof(*hdr))
+        if (!opcode || req->rq_reqmsg->buflens[offset] < sizeof(*opcode))
                 RETURN(-EFAULT);
 
-        rec->ur_opcode = NTOH__u32(hdr->ur_opcode);
+        rec->ur_opcode = NTOH__u32(*opcode);
 
         if (rec->ur_opcode < 0 || rec->ur_opcode > REINT_MAX)
                 RETURN(-EFAULT);
index a949a3b..51bb5f5 100644 (file)
 #include <linux/lustre_net.h>
 #include <linux/lustre_lib.h>
 
+
+int ll_sync_io_cb(void *data, int err, int phase)
+{
+        struct io_cb_data *d = data;
+        int ret;
+        ENTRY; 
+
+        if (phase == CB_PHASE_START) { 
+                ret = l_wait_event_killable(d->waitq, d->complete);
+                if (atomic_dec_and_test(&d->refcount))
+                        OBD_FREE(d, sizeof(*d));
+                if (ret == -ERESTARTSYS)
+                        return ret;
+        } else if (phase == CB_PHASE_FINISH) { 
+                d->err = err;
+                d->complete = 1;
+                wake_up(&d->waitq); 
+                if (atomic_dec_and_test(&d->refcount))
+                        OBD_FREE(d, sizeof(*d));
+                return err;
+        } else 
+                LBUG();
+        EXIT;
+        return 0;
+}
+
+struct  io_cb_data *ll_init_cb(void)
+{
+        struct io_cb_data *d;
+
+
+        OBD_ALLOC(d, sizeof(*d));
+        if (d) { 
+                init_waitqueue_head(&d->waitq);
+                atomic_set(&d->refcount, 2);
+        }
+        RETURN(d); 
+}
+
 /*
  * Remove page from dirty list
  */
index a26825f..0bbad70 100644 (file)
@@ -256,28 +256,34 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         struct inode *inode = filp->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ldlm_extent extent;
-        struct lustre_handle lockh;
-        __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+        struct lustre_handle *lockhs = NULL;
+        struct lov_stripe_md *md = ll_i2info(inode)->lli_smd;
         int flags = 0;
         ldlm_error_t err;
         ssize_t retval;
         ENTRY;
 
+
+
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
+                OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
+                if (!lockhs)
+                        RETURN(-ENOMEM); 
+
                 extent.start = *ppos;
                 extent.end = *ppos + count;
                 CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
                        inode->i_ino, extent.start, extent.end);
 
-                err = obd_enqueue(&sbi->ll_osc_conn, NULL, res_id, LDLM_EXTENT,
+                err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT,
                                   &extent, sizeof(extent), LCK_PR, &flags,
                                   ll_lock_callback, inode, sizeof(*inode),
-                                  &lockh);
+                                  lockhs);
                 if (err != ELDLM_OK) {
+                        OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
                         CERROR("lock enqueue: err: %d\n", err);
                         RETURN(err);
                 }
-                ldlm_lock_dump((void *)(unsigned long)lockh.addr);
         }
 
         CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n",
@@ -288,13 +294,16 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
                 ll_update_atime(inode);
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, LCK_PR, &lockh);
+                err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PR, lockhs);
                 if (err != ELDLM_OK) {
+                        OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
                         CERROR("lock cancel: err: %d\n", err);
                         RETURN(err);
                 }
         }
 
+        if (lockhs)
+                OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
         RETURN(retval);
 }
 
@@ -308,14 +317,17 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ldlm_extent extent;
-        struct lustre_handle lockh;
-        __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+        struct lustre_handle *lockhs = NULL;
+        struct lov_stripe_md *md = ll_i2info(inode)->lli_smd;
         int flags = 0;
         ldlm_error_t err;
         ssize_t retval;
         ENTRY;
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
+                OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
+                if (!lockhs)
+                        RETURN(-ENOMEM); 
                 /* FIXME: this should check whether O_APPEND is set and adjust
                  * extent.start accordingly */
                 extent.start = *ppos;
@@ -323,15 +335,15 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
                 CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
                        inode->i_ino, extent.start, extent.end);
 
-                err = obd_enqueue(&sbi->ll_osc_conn, NULL, res_id, LDLM_EXTENT,
+                err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT,
                                   &extent, sizeof(extent), LCK_PW, &flags,
                                   ll_lock_callback, inode, sizeof(*inode),
-                                  &lockh);
+                                  lockhs);
                 if (err != ELDLM_OK) {
+                        OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
                         CERROR("lock enqueue: err: %d\n", err);
                         RETURN(err);
                 }
-                ldlm_lock_dump((void *)(unsigned long)lockh.addr);
         }
 
         CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n",
@@ -340,13 +352,16 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         retval = generic_file_write(file, buf, count, ppos);
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, LCK_PW, &lockh);
+                err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PW, lockhs);
                 if (err != ELDLM_OK) {
+                        OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
                         CERROR("lock cancel: err: %d\n", err);
                         RETURN(err);
                 }
         }
 
+        if (lockhs)
+                OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
         RETURN(retval);
 }
 
index 122ff36..f2e4719 100644 (file)
@@ -26,7 +26,7 @@
 static int ll_reconnect(struct ll_sb_info *sbi)
 {
         struct ll_fid rootfid;
-        __u64 last_committed, last_rcvd;
+        __u64 last_committed;
         __u32 last_xid;
         int err;
         struct ptlrpc_request *request; 
@@ -44,14 +44,13 @@ static int ll_reconnect(struct ll_sb_info *sbi)
         /* XXX: need to store the last_* values somewhere */
         err = mdc_getstatus(&sbi->ll_mdc_conn,
                           &rootfid, &last_committed, 
-                          &last_rcvd,
                           &last_xid,
                           &request);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
                 GOTO(out_disc, err = -ENOTCONN);
         }
-        sbi2mdc(sbi)->cl_client->cli_last_rcvd = last_xid;
+        sbi2mdc(sbi)->cl_client->cli_last_xid = last_xid;
         sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD;
 
  out_disc:
@@ -78,7 +77,7 @@ int ll_recover(struct ptlrpc_client *cli)
                 /* replay what needs to be replayed */
                 if (req->rq_flags & PTL_RPC_FL_REPLAY) {
                         CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n", 
-                               req->rq_xid, cli->cli_last_rcvd);
+                               req->rq_xid, cli->cli_last_xid);
                         rc = ptlrpc_replay_req(req); 
                         if (rc) { 
                                 CERROR("recovery replay error %d for request %Ld\n", 
@@ -89,17 +88,17 @@ int ll_recover(struct ptlrpc_client *cli)
 
                 /* server has seen req, we have reply: skip */
                 if ((req->rq_flags & PTL_RPC_FL_REPLIED)  &&
-                    req->rq_xid <= cli->cli_last_rcvd) { 
+                    req->rq_xid <= cli->cli_last_xid) { 
                         CDEBUG(D_INODE, "req %Ld was complete: skip [last rcvd %Ld]\n", 
-                               req->rq_xid, cli->cli_last_rcvd);
+                               req->rq_xid, cli->cli_last_xid);
                         continue;
                 }
 
                 /* server has lost req, we have reply: resend, ign reply */
                 if ((req->rq_flags & PTL_RPC_FL_REPLIED)  &&
-                    req->rq_xid > cli->cli_last_rcvd) { 
+                    req->rq_xid > cli->cli_last_xid) { 
                         CDEBUG(D_INODE, "lost req %Ld have rep: replay [last rcvd %Ld]\n", 
-                               req->rq_xid, cli->cli_last_rcvd);
+                               req->rq_xid, cli->cli_last_xid);
                         rc = ptlrpc_replay_req(req); 
                         if (rc) {
                                 CERROR("request resend error %d for request %Ld\n", 
@@ -110,17 +109,17 @@ int ll_recover(struct ptlrpc_client *cli)
 
                 /* server has seen req, we have lost reply: -ERESTARTSYS */
                 if ( !(req->rq_flags & PTL_RPC_FL_REPLIED)  &&
-                     req->rq_xid <= cli->cli_last_rcvd) { 
+                     req->rq_xid <= cli->cli_last_xid) { 
                         CDEBUG(D_INODE, "lost rep %Ld srv did req: restart [last rcvd %Ld]\n", 
-                               req->rq_xid, cli->cli_last_rcvd);
+                               req->rq_xid, cli->cli_last_xid);
                         ptlrpc_restart_req(req);
                 }
 
                 /* service has not seen req, no reply: resend */
                 if ( !(req->rq_flags & PTL_RPC_FL_REPLIED)  &&
-                     req->rq_xid > cli->cli_last_rcvd) {
+                     req->rq_xid > cli->cli_last_xid) {
                         CDEBUG(D_INODE, "lost rep/req %Ld: resend [last rcvd %Ld]\n", 
-                               req->rq_xid, cli->cli_last_rcvd);
+                               req->rq_xid, cli->cli_last_xid);
                         ptlrpc_resend_req(req);
                 }
 
index 0200ca6..8f3ce22 100644 (file)
 #include <linux/lustre_lite.h>
 #include <linux/lustre_lib.h>
 
+
 /* SYNCHRONOUS I/O to object storage for an inode */
 static int ll_brw(int rw, struct inode *inode, struct page *page, int create)
 {
         struct ll_inode_info *lii = ll_i2info(inode);
         struct lov_stripe_md *md = lii->lli_smd;
-        obd_size         count = PAGE_SIZE;
-        obd_off          offset = ((obd_off)page->index) << PAGE_SHIFT;
-        obd_flag         flags = create ? OBD_BRW_CREATE : 0;
+        struct brw_page pg; 
         int              err;
+        struct io_cb_data *cbd = ll_init_cb();
         ENTRY;
+        if (!cbd) 
+                RETURN(-ENOMEM); 
+
+        pg.pg = page;
+        pg.count = PAGE_SIZE;
+        pg.off = ((obd_off)page->index) << PAGE_SHIFT;
+        pg.flag = create ? OBD_BRW_CREATE : 0;
 
-        err = obd_brw(rw, ll_i2obdconn(inode), md, 1,
-                      &page, &count, &offset, &flags, NULL, NULL);
+        err = obd_brw(rw, ll_i2obdconn(inode), md, 1, &pg, ll_sync_io_cb, cbd);
         RETURN(err);
 } /* ll_brw */
 
@@ -139,6 +145,7 @@ static int ll_writepage(struct page *page)
         RETURN(err);
 }
 
+
 /* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
  * too */
 static int ll_commit_write(struct file *file, struct page *page,
@@ -148,13 +155,19 @@ static int ll_commit_write(struct file *file, struct page *page,
         struct inode *inode = page->mapping->host;
         struct ll_inode_info *lii = ll_i2info(inode);
         struct lov_stripe_md *md = lii->lli_smd;
-        obd_size         count = to;
-        obd_off          offset = (((obd_off)page->index) << PAGE_SHIFT);
-        obd_flag         flags = create ? OBD_BRW_CREATE : 0;
+        struct brw_page pg; 
         int              err;
         struct iattr     iattr;
+        struct io_cb_data *cbd = ll_init_cb();
+
+        pg.pg = page;
+        pg.count = to;
+        pg.off = (((obd_off)page->index) << PAGE_SHIFT);
+        pg.flag = create ? OBD_BRW_CREATE : 0;
 
         ENTRY;
+        if (!cbd) 
+                RETURN(-ENOMEM); 
 
         SetPageUptodate(page);
 
@@ -162,13 +175,13 @@ static int ll_commit_write(struct file *file, struct page *page,
                 LBUG();
 
         CDEBUG(D_INODE, "commit_page writing (at %d) to %d, count %Ld\n",
-               from, to, (unsigned long long)count);
+               from, to, (unsigned long long)pg.count);
 
         err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md,
-                      1, &page, &count, &offset, &flags, NULL, NULL);
+                      1, &pg, ll_sync_io_cb, cbd);
         kunmap(page);
 
-        iattr.ia_size = offset + to;
+        iattr.ia_size = pg.off + pg.count;
         if (iattr.ia_size > inode->i_size) {
                 /* do NOT truncate when writing in the middle of a file */
                 inode->i_size = iattr.ia_size;
@@ -226,47 +239,45 @@ int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
         obd_count        bufs_per_obdo = iobuf->nr_pages;
         struct ll_inode_info *lii = ll_i2info(inode);
         struct lov_stripe_md *md = lii->lli_smd;
-        obd_size         *count = NULL;
-        obd_off          *offset = NULL;
-        obd_flag         *flags = NULL;
+        struct brw_page *pga; 
         int              rc = 0;
         int i;
+        struct io_cb_data *cbd = ll_init_cb();
 
         ENTRY;
+        if (!cbd) 
+                RETURN(-ENOMEM); 
 
         if (blocksize != PAGE_SIZE) {
                 CERROR("direct_IO blocksize != PAGE_SIZE\n");
                 return -EINVAL;
         }
 
-        OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
-        OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
-        OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
-        if (!count || !offset || !flags)
+        OBD_ALLOC(pga, sizeof(*pga) * bufs_per_obdo);
+        if (pga) 
                 GOTO(out, rc = -ENOMEM);
 
         /* NB: we can't use iobuf->maplist[i]->index for the offset
          * instead of "blocknr" because ->index contains garbage.
          */
         for (i = 0; i < bufs_per_obdo; i++, blocknr++) {
-                count[i] = PAGE_SIZE;
-                offset[i] = (obd_off)blocknr << PAGE_SHIFT;
-                flags[i] = OBD_BRW_CREATE;
+                pga[i].pg = iobuf->maplist[i];
+                pga[i].count = PAGE_SIZE;
+                pga[i].off = (obd_off)blocknr << PAGE_SHIFT;
+                pga[i].flag = OBD_BRW_CREATE;
         }
 
         if (!md || !md->lmd_object_id)
                 GOTO(out, rc = -ENOMEM);
 
         rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
-                     ll_i2obdconn(inode), md, bufs_per_obdo,
-                     iobuf->maplist, count, offset, flags, NULL, NULL);
+                     ll_i2obdconn(inode), md, bufs_per_obdo, pga,
+                     ll_sync_io_cb, cbd);
         if (rc == 0)
                 rc = bufs_per_obdo * PAGE_SIZE;
 
 out:
-        OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
-        OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
-        OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
+        OBD_FREE(pga, sizeof(*pga) * bufs_per_obdo);
         RETURN(rc);
 }
 
index d6ab7d1..665ca1d 100644 (file)
@@ -86,7 +86,7 @@ static struct super_block * ll_read_super(struct super_block *sb,
         int err;
         struct ll_fid rootfid;
         struct statfs sfs;
-        __u64 last_committed, last_rcvd;
+        __u64 last_committed;
         __u32 last_xid;
         struct ptlrpc_request *request = NULL;
         struct ll_inode_md md;
@@ -150,7 +150,7 @@ static struct super_block * ll_read_super(struct super_block *sb,
 
         /* XXX: need to store the last_* values somewhere */
         err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
-                            &last_rcvd, &last_xid, &request);
+                            &last_xid, &request);
         ptlrpc_req_finished(request);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
index cb05eb3..d00296e 100644 (file)
@@ -220,6 +220,11 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st
                 md->lmd_stripe_count = lov->desc.ld_default_stripe_count;
         }
 
+        if (!md->lmd_stripe_size)
+                md->lmd_stripe_size = lov->desc.ld_default_stripe_size;
+
+                
+
         for (i = 0; i < md->lmd_stripe_count; i++) {
                 struct lov_stripe_md obj_md; 
                 struct lov_stripe_md *obj_mdp = &obj_md; 
@@ -269,7 +274,7 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
         for (i = 0; i < md->lmd_stripe_count; i++) {
                 /* create data objects with "parent" OA */ 
                 memcpy(&tmp, oa, sizeof(tmp));
-                oa->o_id = md->lmd_objects[i].l_object_id; 
+                tmp.o_id = md->lmd_objects[i].l_object_id; 
                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
                 if (!rc) { 
                         CERROR("Error destroying object %Ld on %d\n",
@@ -359,7 +364,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         ENTRY;
 
         if (!md) { 
-                CERROR("LOV requires striping ea for desctruction\n"); 
+                CERROR("LOV requires striping ea for opening\n"); 
                 RETURN(-EINVAL); 
         }
 
@@ -373,7 +378,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                 oa->o_id = md->lmd_objects[i].l_object_id; 
 
                 rc = obd_open(&lov->tgts[i].conn, &tmp, NULL);
-                if (!rc) { 
+                if (rc) { 
                         CERROR("Error getattr object %Ld on %d\n",
                                oa->o_id, i); 
                 }
@@ -414,6 +419,10 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         RETURN(rc);
 }
 
+#ifndef log2
+#define log2(n) ffz(~(n))
+#endif
+
 /* compute offset in stripe i corresponds to offset "in" */
 __u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
 {
@@ -436,33 +445,26 @@ __u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
         return (__u64) out;
 }
 
-
-struct lov_callback_data {
-        atomic_t count;
-        wait_queue_head_t waitq;
-};
-
-static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
+/* compute offset in stripe i corresponds to offset "in" */
+__u64 lov_stripe(struct lov_stripe_md *md, __u64 in, int *j)
 {
-        struct lov_callback_data *cb_data = data;
+        __u32 ssz = md->lmd_stripe_size;
+        __u32 off, out;
+        /* full stripes across all * stripe size */
+        *j = (((__u32) in)/ssz) % md->lmd_stripe_count;
+        off =  (__u32)in % (md->lmd_stripe_count * ssz);
+        out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz + 
+                (off - ((*j) * ssz)) % ssz;;
 
-        if (atomic_dec_and_test(&cb_data->count))
-                wake_up(&cb_data->waitq);
+        return (__u64) out;
 }
 
-static int lov_read_check_status(struct lov_callback_data *cb_data)
+int lov_stripe_which(struct lov_stripe_md *md, __u64 in)
 {
-        ENTRY;
-        if (sigismember(&(current->pending.signal), SIGKILL) ||
-            sigismember(&(current->pending.signal), SIGTERM) ||
-            sigismember(&(current->pending.signal), SIGINT)) {
-                // FIXME XXX what here 
-                // cb_data->flags |= PTL_RPC_FL_INTR;
-                RETURN(1);
-        }
-        if (atomic_read(&cb_data->count) == 0)
-                RETURN(1);
-        RETURN(0);
+        __u32 ssz = md->lmd_stripe_size;
+        int j; 
+        j = (((__u32) in)/ssz) % md->lmd_stripe_count;
+        return j;
 }
 
 
@@ -492,6 +494,8 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
                 __u64 starti = lov_offset(md, start, i); 
                 __u64 endi = lov_offset(md, end, i); 
                         
+                if (starti == endi)
+                        continue;
                 /* create data objects with "parent" OA */ 
                 memcpy(&tmp, oa, sizeof(tmp));
                 oa->o_id = md->lmd_objects[i].l_object_id; 
@@ -506,160 +510,189 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
         RETURN(rc);
 }
 
+struct lov_callback_data {
+        atomic_t count;
+        struct io_cb_data *cbd;
+        brw_callback_t cb;
+        int err;
+};
+
+int lov_osc_brw_callback(void *data, int err, int phase)
+{
+        struct lov_callback_data *d = data;
+        int ret = 0;
+        ENTRY; 
+
+        if (phase == CB_PHASE_START) { 
+                RETURN(0);
+        } else if (phase == CB_PHASE_FINISH) { 
+                if (err) 
+                        d->err = err;
+                if (atomic_dec_and_test(&d->count)) { 
+                        ret = d->cb(d->cbd, 0, d->err); 
+                }
+                RETURN(ret);
+        } else 
+                LBUG();
+        EXIT;
+        return 0;
+}
 
-#if 0
-static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
-                   struct obdo **oa,
-                   obd_count *oa_bufs, struct page **buf,
-                   obd_size *count, obd_off *offset, obd_flag *flags,
-                   bulk_callback_t callback, void *data)
+static inline int lov_brw(int cmd, struct lustre_handle *conn, 
+                          struct lov_stripe_md *md, 
+                          obd_count oa_bufs,
+                          struct brw_page *pga,
+                          brw_callback_t callback, void *data)
 {
-        int rc, i, page_array_offset = 0;
-        obd_off off = offset;
-        obd_size retval = 0;
-        struct lov_callback_data *cb_data;
+        int stripe_count = md->lmd_stripe_count;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
+        struct { 
+                int bufct;
+                int index;
+                int subcount;
+                struct lov_stripe_md md;
+        } *stripeinfo;
+        struct brw_page *ioarr;
+        int rc, i;
+        struct lov_callback_data *lov_cb_data;
         ENTRY;
 
-        if (num_oa != 1)
-                LBUG();
+        lov = &export->exp_obd->u.lov;
 
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
+        OBD_ALLOC(lov_cb_data, sizeof(*lov_cb_data));
+        if (!lov_cb_data)
+                RETURN(-ENOMEM);
 
-        OBD_ALLOC(cb_data, sizeof(*cb_data));
-        if (cb_data == NULL) {
-                LBUG();
+        OBD_ALLOC(stripeinfo,  stripe_count * sizeof(*stripeinfo));
+        if (!stripeinfo) 
+                RETURN(-ENOMEM); 
+
+        OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
+        if (!ioarr) { 
+                OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
                 RETURN(-ENOMEM);
         }
-        INIT_WAITQUEUE_HEAD(&cb_data->waitq);
-        atomic_set(&cb_data->count, 0);
 
-        for (i = 0; i < oa_bufs[0]; i++) {
-                struct page *current_page = buf[i];
+        for (i=0 ; i < oa_bufs ; i++ ) { 
+                int which;
+                which = lov_stripe_which(md, pga[i].pg->index * PAGE_SIZE);
+                stripeinfo[which].bufct++;
+        }
 
-                struct lov_md *md = (struct lov_md *)oa[i]->inline;
-                int bufcount = oa_bufs[i];
-                // md->lmd_stripe_count
+        for (i=0 ; i < stripe_count ; i++) { 
+                if (i>0)
+                        stripeinfo[i].index = 
+                                stripeinfo[i-1].index + stripeinfo[i-1].bufct;
+                stripeinfo[i].md.lmd_object_id = 
+                        md->lmd_objects[i].l_object_id;
+        }
 
-                for (k = page_array_offset; k < bufcount + page_array_offset;
-                     k++) {
-                        
-                }
-                page_array_offset += bufcount;
-
-
-        while (off < offset + count) {
-                int stripe, conn;
-                obd_size size, tmp;
-
-                stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
-                size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
-                if (size > *count)
-                        size = *count;
-
-                conn = stripe % conn->oc_dev->obd_multi_count;
-
-                tmp = size;
-                atomic_inc(&cb_data->count);
-                rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
-                             num_oa, oa, buf,
-                              &size, off, lov_read_callback, cb_data);
-                if (rc == 0)
-                        retval += size;
-                else {
-                        CERROR("read(off=%Lu, count=%Lu): %d\n",
-                               (unsigned long long)off,
-                               (unsigned long long)size, rc);
-                        break;
-                }
+        for (i=0 ; i < oa_bufs ; i++ ) { 
+                int which, shift;
+                which = lov_stripe_which(md, pga[i].pg->index * PAGE_SIZE);
 
-                buf += size;
+                shift = stripeinfo[which].index;
+                ioarr[shift + stripeinfo[which].subcount] = pga[i];
+                pga[i].off = lov_offset(md, pga[i].pg->index * PAGE_SIZE, which);
+                stripeinfo[which].subcount++;
         }
+        
+        lov_cb_data->cb = callback;
+        lov_cb_data->cbd = data;
+        atomic_set(&lov_cb_data->count, oa_bufs);
+        for (i=0 ; i < stripe_count ; i++) { 
+                int shift = stripeinfo[i].index;
 
-        wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
-        if (cb_data->flags & PTL_RPC_FL_INTR)
-                rc = -EINTR;
+                obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md, 
+                        stripeinfo[i].bufct, &ioarr[shift], 
+                        lov_osc_brw_callback,  &lov_cb_data);
+        }
 
-        /* FIXME: The error handling here sucks */
-        *count = retval;
-        OBD_FREE(cb_data, sizeof(*cb_data));
-        RETURN(rc);
-}
+        rc = callback(lov_cb_data, 0, CB_PHASE_START);
 
-static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
-{
-        
+        RETURN(rc);
 }
 
-/* buffer must lie in user memory here */
-static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
-                         obd_size *count, obd_off offset)
+static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md,
+                       struct lustre_handle *parent_lock, 
+                       __u32 type, void *cookie, int cookielen, __u32 mode,
+                       int *flags, void *cb, void *data, int datalen,
+                       struct lustre_handle *lockhs)
 {
-        int err;
-        struct file *file;
-        unsigned long retval;
-
+        int rc = 0, i;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
         ENTRY;
-        if (!class_conn2export(conn)) {
-                CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
-                EXIT;
-                return -EINVAL;
-        }
 
-        file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
-        if (!file || IS_ERR(file)) {
-                EXIT;
-                return -PTR_ERR(file);
+        if (!md) { 
+                CERROR("LOV requires striping ea for desctruction\n"); 
+                RETURN(-EINVAL); 
         }
 
-        /* count doubles as retval */
-        retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
-        filp_close(file, 0);
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
 
-        if ( retval >= 0 ) {
-                err = 0;
-                *count = retval;
-                EXIT;
-        } else {
-                err = retval;
-                *count = 0;
-                EXIT;
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
+                struct ldlm_extent sub_ext;
+                struct lov_stripe_md submd;
+
+                sub_ext.start = lov_offset(md, extent->start, i); 
+                sub_ext.end = lov_offset(md, extent->end, i); 
+                if ( sub_ext.start == sub_ext.end ) 
+                        continue;
+
+                submd.lmd_object_id = md->lmd_objects[i].l_object_id;
+                submd.lmd_easize = sizeof(submd);
+                rc = obd_enqueue(&(lov->tgts[i].conn), &submd, parent_lock,  type, 
+                                 &sub_ext, sizeof(sub_ext), mode, flags, cb, data, datalen, &(lockhs[i]));
+                // XXX add a lock debug statement here
+                if (!rc) { 
+                        CERROR("Error punch object %Ld subobj %Ld\n", md->lmd_object_id,
+                               md->lmd_objects[i].l_object_id); 
+                }
         }
-
-        return err;
+        RETURN(rc);
 }
 
-static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
-                       struct ldlm_handle *parent_lock, __u64 *res_id,
-                       __u32 type, struct ldlm_extent *extent, __u32 mode,
-                       int *flags, void *data, int datalen,
-                       struct ldlm_handle *lockh)
+static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, __u32 mode,
+                      struct lustre_handle *lockhs)
 {
-        int rc;
+        int rc = 0, i;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
         ENTRY;
 
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
+        if (!md) { 
+                CERROR("LOV requires striping ea for lock cancellation\n"); 
+                RETURN(-EINVAL); 
+        }
 
-        rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
-                         res_id, type, extent, mode, flags, data, datalen,
-                         lockh);
-        RETURN(rc);
-}
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
 
-static int lov_cancel(struct lustre_handle *conn, __u32 mode,
-                      struct ldlm_handle *lockh)
-{
-        int rc;
-        ENTRY;
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                struct lov_stripe_md submd;
 
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
+                if ( lockhs[i].addr == 0 )
+                        continue;
 
-        rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
+                submd.lmd_object_id = md->lmd_objects[i].l_object_id;
+                submd.lmd_easize = sizeof(submd);
+                rc = obd_cancel(&lov->tgts[i].conn, &submd, mode, &lockhs[i]);
+                if (!rc) { 
+                        CERROR("Error punch object %Ld subobj %Ld\n", md->lmd_object_id,
+                               md->lmd_objects[i].l_object_id); 
+                }
+        }
         RETURN(rc);
 }
-#endif
+
+
+
 
 struct obd_ops lov_obd_ops = {
         o_setup:       lov_setup,
@@ -671,12 +704,10 @@ struct obd_ops lov_obd_ops = {
         o_setattr:     lov_setattr,
         o_open:        lov_open,
         o_close:       lov_close,
-#if 0
-        o_brw:         lov_pgcache_brw,
+        o_brw:         lov_brw,
         o_punch:       lov_punch,
         o_enqueue:     lov_enqueue,
         o_cancel:      lov_cancel
-#endif
 };
 
 
index 81cb40f..68ba0b6 100644 (file)
@@ -49,15 +49,11 @@ int mdc_setattr(struct lustre_handle *conn,
                 struct inode *inode, struct iattr *iattr,
                 struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
-        struct mds_rec_setattr *rec;
         struct ptlrpc_request *req;
+        struct mds_rec_setattr *rec;
         int rc, size = sizeof(*rec);
         ENTRY;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
         req = ptlrpc_prep_req2(conn, MDS_REINT, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
@@ -126,12 +122,11 @@ int mdc_create(struct lustre_handle *conn,
  resend:
         rc = mdc_reint(req, level);
         if (rc == -ERESTARTSYS) {
-                struct mds_update_record_hdr *hdr =
-                        lustre_msg_buf(req->rq_reqmsg, 0);
+                __u32 *opcode = lustre_msg_buf(req->rq_reqmsg, 0);
                 level = LUSTRE_CONN_RECOVD;
                 CERROR("Lost reply: re-create rep.\n");
                 req->rq_flags = 0;
-                hdr->ur_opcode = NTOH__u32(REINT_RECREATE);
+                *opcode = NTOH__u32(REINT_RECREATE);
                 goto resend;
         }
 
index bf725c2..6d9fdaa 100644 (file)
@@ -56,8 +56,8 @@ int mdc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
 }
 
 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
-                  __u64 *last_committed, __u64 *last_rcvd,
-                  __u32 *last_xid, struct ptlrpc_request **request)
+                  __u64 *last_committed, __u32 *last_xid, 
+                  struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
@@ -81,15 +81,11 @@ int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
                 mds_unpack_body(body);
                 memcpy(rootfid, &body->fid1, sizeof(*rootfid));
                 *last_committed = req->rq_repmsg->last_committed;
-                *last_rcvd = req->rq_repmsg->last_rcvd;
-                *last_xid = body->last_xid;
+                *last_xid = req->rq_repmsg->last_xid;
 
-                CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_rcvd=%Lu,"
-                       " last_xid=%d\n",
+                CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_xid=%d\n",
                        (unsigned long)rootfid->id,
-                       (unsigned long long)*last_committed,
-                       (unsigned long long)*last_rcvd,
-                       body->last_xid);
+                       (unsigned long long)*last_committed, last_xid);
         }
 
         EXIT;
@@ -145,15 +141,11 @@ int mdc_getattr(struct lustre_handle *conn,
                 obd_id ino, int type, unsigned long valid, size_t ea_size,
                 struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ptlrpc_request *req;
         struct mds_body *body;
         int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
         ENTRY;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
         req = ptlrpc_prep_req2(conn, MDS_GETATTR, 1, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
index 70116ff..5f2e1d3 100644 (file)
@@ -342,7 +342,7 @@ static int mds_getstatus(struct ptlrpc_request *req)
 
         /* mcd_last_xid is is stored in little endian on the disk and
            mds_pack_rep_body converts it to network order */
-        body->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
+        req->rq_repmsg->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
         mds_pack_rep_body(req);
         RETURN(0);
 }
@@ -951,7 +951,7 @@ int mds_handle(struct ptlrpc_request *req)
 
         if (!rc) { 
                 struct mds_obd *mds = mds_req2mds(req);
-                req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
+                req->rq_repmsg->last_xid = HTON__u64(mds->mds_last_rcvd);
                 req->rq_repmsg->last_committed =
                         HTON__u64(mds->mds_last_committed);
                 CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
index b006a08..5bf7ba5 100644 (file)
@@ -2,6 +2,9 @@ DEFS=
 MODULE = obdclass
 modulefs_DATA = obdclass.o
 EXTRA_PROGRAMS = obdclass
-obdclass_SOURCES = genops.c proc_lustre.c class_obd.c sysctl.c
+obdclass_SOURCES = genops.c proc_lustre.c class_obd.c sysctl.c page.c
+
+page.c: 
+       test -e page.c || ln -sf $(top_srcdir)/lib/page.c
 
 include $(top_srcdir)/Rules
index 776490e..560ec6e 100644 (file)
@@ -473,14 +473,15 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                 rw = OBD_BRW_WRITE;
         case OBD_IOC_BRW_READ: {
                 struct lov_stripe_md smd;
+                struct io_cb_data *cbd = ll_init_cb();
                 obd_count       pages = 0;
-                struct page     **bufs = NULL;
-                obd_size        *counts = NULL;
-                obd_off         *offsets = NULL;
-                obd_flag        *flags = NULL;
+                struct brw_page *pga;
                 int             j;
                 unsigned long off;
                 void *from;
+                
+                if (!cbd)
+                        GOTO(out, -ENOMEM); 
 
                 obd_data2conn(&conn, data);
 
@@ -488,11 +489,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
 
                 CDEBUG(D_INODE, "BRW %s with %d pages\n",
                        rw == OBD_BRW_READ ? "read" : "write", pages);
-                OBD_ALLOC(bufs, pages * sizeof(*bufs));
-                OBD_ALLOC(counts, pages * sizeof(*counts));
-                OBD_ALLOC(offsets, pages * sizeof(*offsets));
-                OBD_ALLOC(flags, pages * sizeof(*flags));
-                if (!bufs || !counts || !offsets || !flags) {
+                OBD_ALLOC(pga, pages * sizeof(*pga));
+                if (!pga) {
                         CERROR("no memory for %d BRW per-page data\n", pages);
                         GOTO(brw_free, err = -ENOMEM);
                 }
@@ -517,25 +515,20 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                                 CERROR("no memory for brw pages\n");
                                 GOTO(brw_cleanup, err = -ENOMEM);
                         }
-                        bufs[j] = virt_to_page(to);
-                        counts[j] = PAGE_SIZE;
-                        offsets[j] = off;
-                        flags[j] = 0;
+                        pga[j].pg = virt_to_page(to);
+                        pga[j].count = PAGE_SIZE;
+                        pga[j].off = off;
+                        pga[j].flag = 0;
                 }
 
-                err = obd_brw(rw, &conn, &smd, j, bufs, counts, offsets, flags,
-                              NULL, NULL);
-
+                err = obd_brw(rw, &conn, &smd, j, pga, ll_sync_io_cb, cbd);
                 EXIT;
         brw_cleanup:
                 for (j = 0; j < pages; j++)
-                        if (bufs[j] != NULL)
-                                __free_pages(bufs[j], 0);
+                        if (pga[j].pg != NULL)
+                                __free_pages(pga[j].pg, 0);
         brw_free:
-                OBD_FREE(bufs, pages * sizeof(*bufs));
-                OBD_FREE(counts, pages * sizeof(*counts));
-                OBD_FREE(offsets, pages * sizeof(*offsets));
-                OBD_FREE(flags, pages * sizeof(*flags));
+                OBD_FREE(pga, pages * sizeof(*pga));
                 GOTO(out, err);
         }
         default:
index d9e847d..f5e13b2 100644 (file)
@@ -725,8 +725,7 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
 
 static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, 
                                struct lov_stripe_md *md, obd_count oa_bufs,
-                               struct page **pages, obd_size *count,
-                               obd_off *offset, obd_flag *flags,
+                               struct brw_page *pga,
                                brw_callback_t callback, void *data)
 {
         struct obd_run_ctxt      saved;
@@ -757,32 +756,33 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
                 CDEBUG(D_INODE, "OP %d obdo pgno: (%d) (%ld,%ld) "
                        "off count (%Ld,%Ld)\n",
                        cmd, pnum, file->f_dentry->d_inode->i_ino,
-                       (unsigned long)offset[pnum] >> PAGE_CACHE_SHIFT,
-                       (unsigned long long)offset[pnum],
-                       (unsigned long long)count[pnum]);
+                       (unsigned long)pga[pnum].off >> PAGE_CACHE_SHIFT,
+                       (unsigned long long)pga[pnum].off,
+                       (unsigned long long)pga[pnum].count);
                 if (cmd & OBD_BRW_WRITE) {
                         loff_t off;
                         char *buffer;
-                        off = offset[pnum];
-                        buffer = kmap(pages[pnum]);
-                        retval = file->f_op->write(file, buffer, count[pnum],
+                        off = pga[pnum].off;
+                        buffer = kmap(pga[pnum].pg);
+                        retval = file->f_op->write(file, buffer, 
+                                                   pga[pnum].count,
                                                    &off);
                         kunmap(pages[pnum]);
                         CDEBUG(D_INODE, "retval %ld\n", retval);
                 } else {
-                        loff_t off = offset[pnum];
-                        char *buffer = kmap(pages[pnum]);
+                        loff_t off = pga[pnum].off;
+                        char *buffer = kmap(pga[pnum].pg);
 
                         if (off >= file->f_dentry->d_inode->i_size) {
-                                memset(buffer, 0, count[pnum]);
-                                retval = count[pnum];
+                                memset(buffer, 0, pga[pnum].count);
+                                retval = pga[pnum].count;
                         } else {
                                 retval = file->f_op->read(file, buffer,
-                                                          count[pnum], &off);
+                                                          pga[pnum].count, &off);
                         }
                         kunmap(pages[pnum]);
 
-                        if (retval != count[pnum]) {
+                        if (retval != pga[pnum].count) {
                                 filp_close(file, 0);
                                 GOTO(out, retval = -EIO);
                         }
@@ -1369,23 +1369,40 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
          *     and arrays to handle the request parameters.
          */
         while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
-                obd_size         brw_count = PAGE_SIZE;
-                obd_off          brw_offset = (page->index) << PAGE_SHIFT;
-                obd_flag         flagr = 0;
-                obd_flag         flagw = OBD_BRW_CREATE;
+                struct brw_page pg; 
+                struct io_cb_data *cbd = ll_init_cb();
+
+                if (!cbd) { 
+                        err = -ENOMEM;
+                        EXIT;
+                        break;
+                }
+
+                pg.pg = page;
+                pg.count = PAGE_SIZE;
+                pg.off = (page->index) << PAGE_SHIFT;
+                pg.flag = 0;
 
                 page->index = index;
-                err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &page,
-                              &brw_count, &brw_offset, &flagr, NULL, NULL);
+                err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, 
+                              ll_sync_io_cb, cbd);
 
                 if ( err ) {
                         EXIT;
                         break;
                 }
+
+                cbd = ll_init_cb();
+                if (!cbd) { 
+                        err = -ENOMEM;
+                        EXIT;
+                        break;
+                }
+                pg.flag = OBD_BRW_CREATE;
                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
 
-                err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &page,
-                              &brw_count, &brw_offset, &flagw, NULL, NULL);
+                err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg,
+                              ll_sync_io_cb, cbd);
 
                 /* XXX should handle dst->o_size, dst->o_blocks here */
                 if ( err ) {
index 4ebefc5..3f960a4 100644 (file)
@@ -316,13 +316,16 @@ static void unmap_and_decref_bulk_desc(void *data)
 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
 {
         struct osc_brw_cb_data *cb_data = data;
+        int err = 0;
         ENTRY;
 
-        if (desc->b_flags & PTL_RPC_FL_INTR)
+        if (desc->b_flags & PTL_RPC_FL_INTR) {
+                err = -ERESTARTSYS;
                 CERROR("got signal\n");
+        }
 
         if (cb_data->callback)
-                cb_data->callback(cb_data->cb_data);
+                cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
 
         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
         OBD_FREE(cb_data, sizeof(*cb_data));
@@ -336,10 +339,7 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
         EXIT;
 }
 
-static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
-                        obd_count page_count, struct page **page_array,
-                        obd_size *count, obd_off *offset, obd_flag *flags,
-                        brw_callback_t callback, void *data)
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, brw_callback_t callback, void *data)
 {
         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
         struct ptlrpc_request *request = NULL;
@@ -390,11 +390,11 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
 
                 bulk->b_xid = xid;           /* single xid for all pages */
 
-                bulk->b_buf = kmap(page_array[mapped]);
-                bulk->b_page = page_array[mapped];
+                bulk->b_buf = kmap(pga[mapped].pg);
+                bulk->b_page = pga[mapped].pg;
                 bulk->b_buflen = PAGE_SIZE;
-                ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
-                                flags[mapped], bulk->b_xid);
+                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                                pga[mapped].flag, bulk->b_xid);
         }
 
         /*
@@ -409,11 +409,9 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
          *
          * On error, we never do the brw_finish, so we handle all decrefs.
          */
-        if (!callback)
-                ptlrpc_bulk_addref(desc);
         rc = ptlrpc_register_bulk(desc);
         if (rc)
-                GOTO(out_desc2, rc);
+                GOTO(out_unmap, rc);
 
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
@@ -432,16 +430,10 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
          *      restart them" and osc_brw callers can know this.
          */
         if (rc)
-                GOTO(out_desc2, rc);
+                GOTO(out_unmap, rc);
 
         /* Callbacks cause asynchronous handling. */
-        if (callback)
-                GOTO(out_req, rc = 0);
-
-        /* If there's no callback function, sleep here until complete. */
-        l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                GOTO(out_desc, rc = -EINTR);
+        rc = callback(data, 0, CB_PHASE_START); 
 
         EXIT;
 out_desc:
@@ -451,9 +443,6 @@ out_req:
         RETURN(rc);
 
         /* Clean up on error. */
-out_desc2:
-        if (!callback)
-                ptlrpc_bulk_decref(desc);
 out_unmap:
         while (mapped-- > 0)
                 kunmap(page_array[mapped]);
@@ -463,8 +452,7 @@ out_unmap:
 
 static int osc_brw_write(struct lustre_handle *conn,
                          struct lov_stripe_md *md, obd_count page_count,
-                         struct page **pagearray, obd_size *count,
-                         obd_off *offset, obd_flag *flags,
+                         struct brw_page *pga,
                          brw_callback_t callback, void *data)
 {
         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
@@ -514,11 +502,11 @@ static int osc_brw_write(struct lustre_handle *conn,
         cb_data->obd_size = page_count * sizeof(*local);
 
         for (mapped = 0; mapped < page_count; mapped++) {
-                local[mapped].addr = kmap(pagearray[mapped]);
-                local[mapped].offset = offset[mapped];
-                local[mapped].len = count[mapped];
-                ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
-                                flags[mapped], 0);
+                local[mapped].addr = kmap(pga[mapped].pg);
+                local[mapped].offset = pga[mapped].off;
+                local[mapped].len = pga[mapped].count;
+                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                                pga[mapped].flag, 0);
         }
 
         size[1] = page_count * sizeof(*remote);
@@ -550,7 +538,7 @@ static int osc_brw_write(struct lustre_handle *conn,
                 bulk->b_buf = (void *)(unsigned long)local[j].addr;
                 bulk->b_buflen = local[j].len;
                 bulk->b_xid = remote->xid;
-                bulk->b_page = pagearray[j];
+                bulk->b_page = pga[j].pg;
         }
 
         if (desc->b_page_count != page_count)
@@ -559,12 +547,7 @@ static int osc_brw_write(struct lustre_handle *conn,
         /*
          * One reference is released when brw_finish is complete, the
          * other here when we finish waiting on it if we don't have a callback.
-         *
-         * We don't reference the bulk descriptor again here if there is a
-         * callback, so we don't need an additional refcount on it.
          */
-        if (!callback)
-                ptlrpc_bulk_addref(desc);
         rc = ptlrpc_send_bulk(desc);
 
         /* XXX: Mike, same question as in osc_brw_read. */
@@ -572,13 +555,7 @@ static int osc_brw_write(struct lustre_handle *conn,
                 GOTO(out_desc2, rc);
 
         /* Callbacks cause asynchronous handling. */
-        if (callback)
-                GOTO(out_req, rc = 0);
-
-        /* If there's no callback function, sleep here until complete. */
-        l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                GOTO(out_desc, rc = -EINTR);
+        rc = callback(data, 0, CB_PHASE_START);
 
         EXIT;
 out_desc:
@@ -603,23 +580,22 @@ out_cb:
 
 static int osc_brw(int cmd, struct lustre_handle *conn,
                    struct lov_stripe_md *md, obd_count page_count,
-                   struct page **page_array, obd_size *count, obd_off *offset,
-                   obd_flag *flags, brw_callback_t callback, void *data)
+                   struct brw_page *pagear, brw_callback_t callback, 
+                   void *data) 
 {
         if (cmd & OBD_BRW_WRITE)
-                return osc_brw_write(conn, md, page_count, page_array, count,
-                                     offset, flags, callback, data);
+                return osc_brw_write(conn, md, page_count, pagear, callback, data);
         else
-                return osc_brw_read(conn, md, page_count, page_array, count,
-                                    offset, flags, callback, data);
+                return osc_brw_read(conn, md, page_count, pagear, callback, data);
 }
 
-static int osc_enqueue(struct lustre_handle *connh,
-                       struct lustre_handle *parent_lock, __u64 *res_id,
+static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, 
+                       struct lustre_handle *parent_lock, 
                        __u32 type, void *extentp, int extent_len, __u32 mode,
                        int *flags, void *callback, void *data, int datalen,
                        struct lustre_handle *lockh)
 {
+        __u64 res_id = { md->lmd_object_id };
         struct obd_device *obddev = class_conn2obd(connh);
         struct ldlm_extent *extent = extentp;
         int rc;
@@ -632,7 +608,7 @@ static int osc_enqueue(struct lustre_handle *connh,
 
         /* Next, search for already existing extent locks that will cover us */
         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
-        rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
+        rc = ldlm_lock_match(obddev->obd_namespace, &res_id, type, extent,
                              sizeof(extent), mode, lockh);
         if (rc == 1) {
                 /* We already have a lock, and it's referenced */
@@ -648,7 +624,7 @@ static int osc_enqueue(struct lustre_handle *connh,
         else
                 mode2 = LCK_PW;
 
-        rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
+        rc = ldlm_lock_match(obddev->obd_namespace, &res_id, type, extent,
                              sizeof(extent), mode2, lockh);
         if (rc == 1) {
                 int flags;
@@ -669,12 +645,12 @@ static int osc_enqueue(struct lustre_handle *connh,
         }
 
         rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
-                              parent_lock, res_id, type, extent, sizeof(extent),
+                              parent_lock, &res_id, type, extent, sizeof(extent),
                               mode, flags, ldlm_completion_ast, callback, data, datalen, lockh);
         return rc;
 }
 
-static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
+static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md, __u32 mode,
                       struct lustre_handle *lockh)
 {
         ENTRY;
index 60e8e0a..d830e82 100644 (file)
@@ -554,7 +554,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                        req->rq_replen, req->rq_repmsg->status);
 
         spin_lock(&cli->cli_lock);
-        cli->cli_last_rcvd = req->rq_repmsg->last_rcvd;
+        cli->cli_last_xid = req->rq_repmsg->last_xid;
         cli->cli_last_committed = req->rq_repmsg->last_committed;
         ptlrpc_free_committed(cli); 
         spin_unlock(&cli->cli_lock);
index 86feff0..38399a1 100644 (file)
@@ -85,7 +85,7 @@ int lustre_unpack_msg(struct lustre_msg *m, int len)
         m->status = NTOH__u32(m->status);
         m->type = NTOH__u32(m->type);
         m->bufcount = NTOH__u32(m->bufcount);
-        m->last_rcvd = NTOH__u64(m->last_rcvd);
+        m->last_xid = NTOH__u64(m->last_xid);
         m->last_committed = NTOH__u64(m->last_committed);
 
         required_len = size_round(sizeof(*m) + m->bufcount * sizeof(__u32));
index 03a32b6..9bf5eaa 100644 (file)
@@ -108,8 +108,8 @@ command_t cmdlist[] = {
         {"detach", jt_obd_detach, 0, "un-name a device\n"
          "usage: detach"},
         {"lovconfig", jt_obd_lov_config, 0,
-         "write lov configuration to a mds device\n"
-         "usage: lovconfig lov-uuid stripcount stripsize pattern UUID1 [UUID2 ...]"},
+         "write lov configuration to an mds device\n"
+         "usage: lovconfig lov-uuid stripe-count stripe-size offset pattern UUID1 [UUID2 ...]"},
 
         /* Device operations */
         {"=== device operations ==", jt_noop, 0, "device operations"},