Whamcloud - gitweb
- mds failover code
authorbraam <braam>
Sun, 12 May 2002 01:06:29 +0000 (01:06 +0000)
committerbraam <braam>
Sun, 12 May 2002 01:06:29 +0000 (01:06 +0000)
- connection and recovd subsystem
- refined handling of replies/timeout with levels:
  - requests are delayed until the request level is lower than or
    equals to the connection level
- much updated network documentation
- updated file system recovery documentation
- server maintains lists of open files and handles "re-opening"
  maintains list in the metadata client info structures.
- flags on requests to indicate their disposition after a reply,
  e.g. retain until commit, retain until explicitly canceled etc.
- new failure instrumentation to drop a reply, but execute the
  request.
- handling of re-sent creation requests
- move file attribute updates on mds to close, remove from write
- reconnection routine in llight.
- work through recovery list more orderly:
  - retain list in sent order
  - handle according to disposition of request
  - return integers not void
  - add direct (0-copy) I/O support -- doesn't compile on 2.4.9
- failure handling in client reintegration code
- replay handling in server reintegration code
- add names to client systems to understand debugging/tracing output better
- remove most lists from the client structure: the multiple lists
  introduced request reordering.  We now use one list and flag the
  requests.
- re-addressing of connections: invoked by the client recovery scripts
- don't reallocate reply buffers if they were already there and not
  consumed in case of re-sending requests.
- introduce a request replay function: I want this to be merged with
  ptlrpc_queue wait soon.
- small support routines for continuing delayed requests, restarting
  requests for which replies were lost, etc.
- try to get negative errors back even when Portals errors return
  positive problems.
- make last committed and received 64 bit in network packets.
- write test programs that:
  - keep files open
  - do I/O every second
- include 5 basic regression cases for failover recovery:
  runfailure-client-mds.sh
- simplify ha_assist.sh -- the secondary ha_assist program does the
  work

26 files changed:
lustre/doc/.cvsignore
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd_support.h
lustre/lib/mds_updates.c
lustre/llite/commit_callback.c
lustre/llite/file.c
lustre/llite/recover.c
lustre/llite/rw.c
lustre/llite/super.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_reint.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/connmgr.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/recovd.c
lustre/ptlrpc/rpc.c
lustre/tests/Makefile.am
lustre/tests/runfailure-client-mds-recover.sh
lustre/utils/ha_assist.sh

index 5e499c3..747b3ed 100644 (file)
@@ -10,3 +10,4 @@ TAGS
 OBD-HOWTO.html
 OBD-HOWTO.txt
 *.eps
+master.pdf
index cce3c43..3e6478a 100644 (file)
@@ -165,19 +165,20 @@ struct obd_ioobj {
  */
 
 /* opcodes */
-#define MDS_GETATTR   1
-#define MDS_OPEN      2
-#define MDS_CLOSE     3
-#define MDS_REINT     4
-#define MDS_READPAGE  5
-#define MDS_CONNECT   6
-
-#define REINT_SETATTR 1
-#define REINT_CREATE  2
-#define REINT_LINK    3
-#define REINT_UNLINK  4
-#define REINT_RENAME  5
-#define REINT_MAX     5
+#define MDS_GETATTR    1
+#define MDS_OPEN       2
+#define MDS_CLOSE      3
+#define MDS_REINT      4
+#define MDS_READPAGE   6
+#define MDS_CONNECT    7
+
+#define REINT_SETATTR  1
+#define REINT_CREATE   2
+#define REINT_LINK     3
+#define REINT_UNLINK   4
+#define REINT_RENAME   5
+#define REINT_RECREATE 6
+#define REINT_MAX      6
 
 struct ll_fid {
         __u64 id;
index 32d8b49..4395900 100644 (file)
@@ -87,10 +87,19 @@ struct mds_client_data {
 /* In-memory access to client data from MDS struct */
 struct mds_client_info {
         struct list_head mci_list;
+        struct list_head mci_open_head;
         struct mds_client_data *mci_mcd;
         int mci_off;
 };
 
+/* file data for open files on MDS */
+struct mds_file_data { 
+        struct list_head mfd_list;
+        struct file * mfd_file;
+        __u64             mfd_clientfd;
+        __u32             mfd_clientcookie;
+};
+
 /* mds/mds_reint.c  */
 int mds_reint_rec(struct mds_update_record *r, struct ptlrpc_request *req);
 struct mds_client_info *mds_uuid_to_mci(struct mds_obd *mds, __u8 *uuid);
@@ -123,7 +132,7 @@ int mdc_getattr(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
 int mdc_setattr(struct ptlrpc_client *, struct ptlrpc_connection *,
                 struct inode *, struct iattr *iattr, struct ptlrpc_request **);
 int mdc_open(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
-             int type, int flags, __u64 *fh, struct ptlrpc_request **req);
+             int type, int flags, __u64 cookie, __u64 *fh, struct ptlrpc_request **req); 
 int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *peer,
               ino_t ino, int type, __u64 fh,  struct ptlrpc_request **req);
 int mdc_readpage(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
index f8132e8..bd5f574 100644 (file)
@@ -79,8 +79,8 @@
 struct ptlrpc_connection {
         struct list_head c_link;
         struct lustre_peer c_peer;
-        __u8 c_local_uuid[37];
-        __u8 c_remote_uuid[37];
+        __u8 c_local_uuid[37];  /* XXX do we need this? */
+        __u8 c_remote_uuid[37]; 
 
         int c_level;
         __u32 c_generation;  /* changes upon new connection */
@@ -104,18 +104,18 @@ struct ptlrpc_client {
         __u64 cli_last_rcvd;
         __u64 cli_last_committed;
 
+        void *cli_data;
         struct semaphore cli_rpc_sem; /* limits outstanding requests */
 
         spinlock_t cli_lock; /* protects lists */
+        struct list_head cli_delayed_head; /* delayed until after recovery */
         struct list_head cli_sending_head;
-        struct list_head cli_sent_head;
-        struct list_head cli_replied_head;
-        struct list_head cli_replay_head;
         struct list_head cli_dying_head;
         struct list_head cli_ha_item;
-        void (*cli_recover)(struct ptlrpc_client *); 
+        int (*cli_recover)(struct ptlrpc_client *); 
 
         struct recovd_obd *cli_recovd;
+        char *cli_name;
 };
 
 /* packet types */
@@ -124,16 +124,18 @@ struct ptlrpc_client {
 
 /* state flags of requests */
 #define PTL_RPC_FL_INTR      (1 << 0)
-#define PTL_RPC_FL_REPLY     (1 << 1)
+#define PTL_RPC_FL_REPLIED   (1 << 1)  /* reply was received */
 #define PTL_RPC_FL_SENT      (1 << 2)
 #define PTL_BULK_FL_SENT     (1 << 3)
 #define PTL_BULK_FL_RCVD     (1 << 4)
 #define PTL_RPC_FL_ERR       (1 << 5)
 #define PTL_RPC_FL_TIMEOUT   (1 << 6)
 #define PTL_RPC_FL_RESEND    (1 << 7)
-#define PTL_RPC_FL_COMMITTED (1 << 8)
+#define PTL_RPC_FL_RECOVERY  (1 << 8)  /* retransmission for recovery */
 #define PTL_RPC_FL_FINISHED  (1 << 9)
-#define PTL_RPC_FL_RETAIN    (1 << 10)
+#define PTL_RPC_FL_RETAIN    (1 << 10) /* retain for replay after reply */
+#define PTL_RPC_FL_REPLAY    (1 << 11) /* replay upon recovery */
+#define PTL_RPC_FL_ALLOCREP  (1 << 12) /* reply buffer allocated */
 
 struct ptlrpc_request { 
         int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
@@ -150,10 +152,12 @@ struct ptlrpc_request {
         int rq_replen;
         struct lustre_msg *rq_repmsg;
         __u64 rq_transno;
+        __u64 rq_xid;
 
         char *rq_bulkbuf;
         int rq_bulklen;
 
+        int rq_level;
         time_t rq_time;
         time_t rq_timeout;
         //        void * rq_reply_handle;
@@ -233,6 +237,7 @@ typedef int (*svc_handler_t)(struct obd_device *obddev,
                              struct ptlrpc_request *req);
 
 /* rpc/connection.c */
+void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, char *uuid);
 struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer);
 int ptlrpc_put_connection(struct ptlrpc_connection *c);
 struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
@@ -252,13 +257,18 @@ void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
 
 /* rpc/client.c */
 void ptlrpc_init_client(struct recovd_obd *, 
-                        void (*recover)(struct ptlrpc_client *),
+                        int (*recover)(struct ptlrpc_client *),
                         int req_portal, int rep_portal,
                         struct ptlrpc_client *);
 void ptlrpc_cleanup_client(struct ptlrpc_client *cli);
 __u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req);
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid);
+
 int ptlrpc_queue_wait(struct ptlrpc_request *req);
+void ptlrpc_continue_req(struct ptlrpc_request *req);
+int ptlrpc_replay_req(struct ptlrpc_request *req);
+void ptlrpc_restart_req(struct ptlrpc_request *req);
+
 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
                                        struct ptlrpc_connection *u, int opcode,
                                        int count, int *lengths, char **bufs);
index 46adb57..b3d9ec1 100644 (file)
@@ -55,7 +55,8 @@ extern unsigned long obd_fail_loc;
 #define OBD_FAIL_MDS_CLOSE_NET           0x115
 #define OBD_FAIL_MDS_CLOSE_PACK          0x116
 #define OBD_FAIL_MDS_CONNECT_NET         0x117
-#define OBD_FAIL_MDS_CONNECT_PACK       0x118
+#define OBD_FAIL_MDS_CONNECT_PACK        0x118
+#define OBD_FAIL_MDS_REINT_NET_REP       0x119
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
index 340f283..2f49c34 100644 (file)
@@ -297,6 +297,7 @@ static update_unpacker mds_unpackers[REINT_MAX + 1] = {
         [REINT_LINK] mds_link_unpack,
         [REINT_UNLINK] mds_unlink_unpack,
         [REINT_RENAME] mds_rename_unpack,
+        [REINT_RECREATE] mds_create_unpack,
 };
 
 int mds_update_unpack(struct ptlrpc_request *req, struct mds_update_record *rec)
index fed2b3d..3bb7820 100644 (file)
@@ -84,8 +84,6 @@ static int ll_commitcbd_main(void *arg)
                         EXIT;
                         break;
                 }
-                if (!list_empty(&sbi->ll_mds_client.cli_replied_head))
-                        CERROR("** clean up committed reqs here **\n"); 
 
                 schedule_timeout(sbi->ll_commitcbd_timeout);
                 CERROR("commit callback daemon woken up - FIXME\n"); 
index 4be688a..8026454 100644 (file)
@@ -51,7 +51,8 @@ static int ll_file_open(struct inode *inode, struct file *file)
         memset(fd, 0, sizeof(*fd));
 
         rc = mdc_open(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
-                      S_IFREG, file->f_flags, &fd->fd_mdshandle, &req);
+                      S_IFREG, file->f_flags, (__u64)(unsigned long)file, 
+                      &fd->fd_mdshandle, &req); 
         fd->fd_req = req;
         ptlrpc_req_finished(req);
         if (rc) {
@@ -110,12 +111,19 @@ static int ll_file_release(struct inode *inode, struct file *file)
         if (rc)
                 GOTO(out, abs(rc));
 
-        iattr.ia_valid = ATTR_SIZE;
-        iattr.ia_size = inode->i_size;
-        rc = ll_inode_setattr(inode, &iattr, 0);
-        if (rc) {
-                CERROR("failed - %d.\n", rc);
-                rc = -EIO; /* XXX - GOTO(out)? -phil */
+        if (file->f_flags & O_WRONLY) {
+                struct iattr attr;
+                attr.ia_valid = ATTR_MTIME | ATTR_CTIME | ATTR_ATIME | ATTR_SIZE;
+                attr.ia_mtime = inode->i_mtime;
+                attr.ia_ctime = inode->i_ctime;
+                attr.ia_atime = inode->i_atime;
+                iattr.ia_size = inode->i_size;
+
+                rc = ll_inode_setattr(inode, &iattr, 0);
+                if (rc) {
+                        CERROR("failed - %d.\n", rc);
+                        rc = -EIO; /* XXX - GOTO(out)? -phil */
+                }
         }
 
         rc = mdc_close(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
@@ -239,14 +247,6 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
 
         retval = generic_file_write(file, buf, count, ppos);
 
-        /* update mtime/ctime/atime here, NOT size */
-        if (retval > 0) {
-                struct iattr attr;
-                attr.ia_valid = ATTR_MTIME | ATTR_CTIME | ATTR_ATIME;
-                attr.ia_mtime = attr.ia_ctime = attr.ia_atime =
-                        CURRENT_TIME;
-                ll_setattr(file->f_dentry, &attr);
-        }
 
 #if 0
         err = obd_cancel(&sbi->ll_conn, LCK_PW, &lockh);
index 115e229..bad0c4a 100644 (file)
 #include <linux/lustre_lite.h>
 #include <linux/lustre_ha.h>
 
-void ll_recover(struct ptlrpc_client *cli)
+static int ll_reconnect(struct ll_sb_info *sbi)
+{
+        struct ll_fid rootfid;
+        __u64 last_committed, last_rcvd;
+        __u32 last_xid;
+        int err;
+        struct ptlrpc_request *request; 
+
+        ptlrpc_readdress_connection(sbi->ll_mds_conn, "mds");
+
+        err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mds_conn);
+        if (err) {
+                CERROR("cannot connect to MDS: rc = %d\n", err);
+                ptlrpc_put_connection(sbi->ll_mds_conn);
+                GOTO(out_disc, err = -ENOTCONN);
+        }
+        sbi->ll_mds_conn->c_level = LUSTRE_CONN_CON;
+
+        /* XXX: need to store the last_* values somewhere */
+        err = mdc_connect(&sbi->ll_mds_client, sbi->ll_mds_conn,
+                          &rootfid, &last_committed, 
+                          &last_rcvd,
+                          &last_xid,
+                          &request);
+        if (err) {
+                CERROR("cannot mds_connect: rc = %d\n", err);
+                GOTO(out_disc, err = -ENOTCONN);
+        }
+        sbi->ll_mds_client.cli_last_rcvd = last_xid;
+        sbi->ll_mds_conn->c_level = LUSTRE_CONN_RECOVD;
+
+ out_disc:
+        return err;
+}
+
+
+int ll_recover(struct ptlrpc_client *cli)
 {
         struct ptlrpc_request *req;
         struct list_head *tmp, *pos;
+        struct ll_sb_info *sbi = cli->cli_data;
+        int rc = 0;
         ENTRY;
 
+        /* 1. reconnect */
+        ll_reconnect(sbi);
+        
+        /* 2. walk the request list */
         spin_lock(&cli->cli_lock);
-        /* first shot at this: resend the request */ 
-        list_for_each_safe(tmp, pos, &cli->cli_sent_head) { 
+        list_for_each_safe(tmp, pos, &cli->cli_sending_head) { 
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                CDEBUG(D_INODE, "replaying request %p\n", req); 
-                list_del(&req->rq_list);
-                ptlrpc_resend_req(req); 
+                
+                /* replay what needs to be replayed */
+                if (req->rq_flags & PTL_RPC_FL_REPLAY) {
+                        CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n", 
+                               req->rq_xid, cli->cli_last_rcvd);
+                        rc = ptlrpc_replay_req(req); 
+                        if (rc) { 
+                                CERROR("recovery replay error %d for request %Ld\n", 
+                                       rc, req->rq_xid); 
+                                GOTO(out, rc);
+                        }
+                }
+
+                /* server has seen req, we have reply: skip */
+                if ((req->rq_flags & PTL_RPC_FL_REPLIED)  &&
+                    req->rq_xid <= cli->cli_last_rcvd) { 
+                        CDEBUG(D_INODE, "req %Ld was complete: skip [last rcvd %Ld]\n", 
+                               req->rq_xid, cli->cli_last_rcvd);
+                        continue;
+                }
+
+                /* server has lost req, we have reply: resend, ign reply */
+                if ((req->rq_flags & PTL_RPC_FL_REPLIED)  &&
+                    req->rq_xid > cli->cli_last_rcvd) { 
+                        CDEBUG(D_INODE, "lost req %Ld have rep: replay [last rcvd %Ld]\n", 
+                               req->rq_xid, cli->cli_last_rcvd);
+                        rc = ptlrpc_replay_req(req); 
+                        if (rc) {
+                                CERROR("request resend error %d for request %Ld\n", 
+                                       rc, req->rq_xid); 
+                                GOTO(out, rc);
+                        }
+                }
+
+                /* server has seen req, we have lost reply: -ERESTARTSYS */
+                if ( !(req->rq_flags & PTL_RPC_FL_REPLIED)  &&
+                     req->rq_xid <= cli->cli_last_rcvd) { 
+                        CDEBUG(D_INODE, "lost rep %Ld srv did req: restart [last rcvd %Ld]\n", 
+                               req->rq_xid, cli->cli_last_rcvd);
+                        ptlrpc_restart_req(req);
+                }
+
+                /* service has not seen req, no reply: resend */
+                if ( !(req->rq_flags & PTL_RPC_FL_REPLIED)  &&
+                     req->rq_xid > cli->cli_last_rcvd) {
+                        CDEBUG(D_INODE, "lost rep/req %Ld: resend [last rcvd %Ld]\n", 
+                               req->rq_xid, cli->cli_last_rcvd);
+                        ptlrpc_resend_req(req);
+                }
+
         }
 
+        sbi->ll_mds_conn->c_level = LUSTRE_CONN_FULL;
         recovd_cli_fixed(cli);
-        spin_unlock(&cli->cli_lock);
+
+        /* Finally, continue what we delayed since recovery started */
+        list_for_each_safe(tmp, pos, &cli->cli_delayed_head) { 
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                ptlrpc_continue_req(req);
+        }
 
         EXIT;
+ out:
+        spin_unlock(&cli->cli_lock);
+        return rc;
 }
index 4e9d655..2ad813a 100644 (file)
@@ -11,6 +11,7 @@
 #include <linux/mm.h>
 #include <linux/string.h>
 #include <linux/stat.h>
+#include <linux/iobuf.h>
 #include <linux/errno.h>
 #include <linux/locks.h>
 #include <linux/unistd.h>
@@ -349,9 +350,64 @@ void ll_truncate(struct inode *inode)
         return;
 } /* ll_truncate */
 
+int ll_direct_IO(int rw, struct inode * inode, struct kiobuf * iobuf, unsigned long blocknr, int blocksize)
+{
+        int i;
+        obd_count        num_obdo = 1;
+        obd_count        bufs_per_obdo = iobuf->nr_pages;
+        struct obdo     *oa = NULL;
+        obd_size         *count = NULL;
+        obd_off          *offset = NULL;
+        obd_flag         *flags = NULL;
+        int              err = 0;
+
+        ENTRY;
+
+        OBD_ALLOC(count, sizeof(obd_size) * bufs_per_obdo); 
+        if (!count)
+                GOTO(out, err=-ENOMEM); 
+
+        OBD_ALLOC(offset, sizeof(obd_off) * bufs_per_obdo); 
+        if (!offset)
+                GOTO(out, err=-ENOMEM); 
+
+        OBD_ALLOC(flags, sizeof(obd_flag) * bufs_per_obdo); 
+        if (!flags)
+                GOTO(out, err=-ENOMEM); 
+
+        for (i = 0 ; i < bufs_per_obdo ; i++) { 
+                count[i] = PAGE_SIZE;
+                offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT;
+                flags[i] = OBD_BRW_CREATE;
+        }
+
+        oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
+        if (!oa)
+                RETURN(-ENOMEM);
+
+        err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
+                      iobuf->maplist, count, offset, flags);
+        if (err == 0) 
+                err = bufs_per_obdo * 4096;
+
+ out:
+        if (oa) 
+                obdo_free(oa);
+        if (flags) 
+                OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo); 
+        if (count) 
+                OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo); 
+        if (offset) 
+                OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo); 
+        RETURN(err);
+}
+
+
+
 struct address_space_operations ll_aops = {
         readpage: ll_readpage,
         writepage: ll_writepage,
+        direct_IO: ll_direct_IO,
         sync_page: block_sync_page,
         prepare_write: ll_prepare_write, 
         commit_write: ll_commit_write,
index 49b19ad..9735456 100644 (file)
@@ -26,7 +26,7 @@ extern struct address_space_operations ll_aops;
 extern struct address_space_operations ll_dir_aops;
 struct super_operations ll_super_operations;
 
-extern void ll_recover(struct ptlrpc_client *);
+extern int ll_recover(struct ptlrpc_client *);
 extern int ll_commitcbd_setup(struct ll_sb_info *);
 extern int ll_commitcbd_cleanup(struct ll_sb_info *);
 
@@ -130,6 +130,8 @@ static struct super_block * ll_read_super(struct super_block *sb,
                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
                            &sbi->ll_mds_client);
 
+        sbi->ll_mds_client.cli_data = sbi;
+        sbi->ll_mds_client.cli_name = "mdc";
         sbi->ll_mds_conn = ptlrpc_uuid_to_connection("mds");
         if (!sbi->ll_mds_conn) {
                 CERROR("cannot find MDS\n");
index 7d38319..842f54a 100644 (file)
 #include <linux/obd_class.h>
 #include <linux/lustre_mds.h>
 
-static int mdc_reint(struct ptlrpc_client *cl, struct ptlrpc_request *request)
+static int mdc_reint(struct ptlrpc_client *cl, struct ptlrpc_request *request, int level)
 {
         int rc;
+        request->rq_level = level;
 
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
-        if (rc)
-                CERROR("error in handling %d\n", rc);
+
+        CERROR("error in handling %d\n", rc);
 
         return rc;
 }
@@ -62,8 +63,10 @@ int mdc_setattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         size = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, &size);
 
-        rc = mdc_reint(cl, req);
+        rc = mdc_reint(cl, req, LUSTRE_CONN_FULL);
         *request = req;
+        if (rc == -ERESTARTSYS )
+                rc = 0;
 
         RETURN(rc);
 }
@@ -77,6 +80,7 @@ int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         struct ptlrpc_request *req;
         int rc, size[3] = {sizeof(*rec), namelen + 1, tgtlen + 1};
         char *tmp;
+        int level;
         ENTRY;
 
         req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL);
@@ -97,9 +101,19 @@ int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         size[0] = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, size);
 
-        rc = mdc_reint(cl, req);
-        *request = req;
+        level = LUSTRE_CONN_FULL;
+ resend:
+        rc = mdc_reint(cl, req, level);
+        if (rc == -ERESTARTSYS ) { 
+                struct mds_update_record_hdr *hdr = lustre_msg_buf(req->rq_reqmsg, 0);
+                level = LUSTRE_CONN_RECOVD;
+                CERROR("Lost reply: re-create rep.\n"); 
+                req->rq_flags = 0;
+                hdr->ur_opcode = NTOH__u32(REINT_RECREATE);
+                goto resend;
+        }
 
+        *request = req;
         RETURN(rc);
 }
 
@@ -126,8 +140,10 @@ int mdc_unlink(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         size[0] = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, size);
 
-        rc = mdc_reint(cl, req);
+        rc = mdc_reint(cl, req, LUSTRE_CONN_FULL);
         *request = req;
+        if (rc == -ERESTARTSYS )
+                rc = 0;
 
         RETURN(rc);
 }
@@ -155,8 +171,10 @@ int mdc_link(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         size[0] = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, size);
 
-        rc = mdc_reint(cl, req);
+        rc = mdc_reint(cl, req, LUSTRE_CONN_FULL);
         *request = req;
+        if (rc == -ERESTARTSYS )
+                rc = 0;
 
         RETURN(rc);
 }
@@ -190,8 +208,10 @@ int mdc_rename(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         size[0] = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, size);
 
-        rc = mdc_reint(cl, req);
+        rc = mdc_reint(cl, req, LUSTRE_CONN_FULL);
         *request = req;
+        if (rc == -ERESTARTSYS )
+                rc = 0;
 
         RETURN(rc);
 }
index 2e18b6d..8c9eb8c 100644 (file)
@@ -48,6 +48,7 @@ int mdc_connect(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 GOTO(out, rc = -ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
+        req->rq_level = LUSTRE_CONN_CON;
         req->rq_replen = lustre_msg_size(1, &size);
 
         mds_pack_req_body(req);
@@ -95,6 +96,7 @@ int mdc_getattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         body->valid = valid;
 
         req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_level = LUSTRE_CONN_FULL;
 
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
@@ -112,7 +114,7 @@ int mdc_getattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
 }
 
 int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
-             ino_t ino, int type, int flags, __u64 *fh,
+             ino_t ino, int type, int flags, __u64 cookie, __u64 *fh,
              struct ptlrpc_request **request)
 {
         struct mds_body *body;
@@ -123,10 +125,12 @@ int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        req->rq_flags |= PTL_RPC_FL_RETAIN;
+        req->rq_flags |= PTL_RPC_FL_REPLAY;
+        req->rq_level = LUSTRE_CONN_FULL;
         body = lustre_msg_buf(req->rq_reqmsg, 0);
         ll_ino2fid(&body->fid1, ino, 0, type);
         body->flags = HTON__u32(flags);
+        body->objid = cookie; 
 
         req->rq_replen = lustre_msg_size(1, &size);
 
@@ -160,6 +164,7 @@ int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         ll_ino2fid(&body->fid1, ino, 0, type);
         body->objid = fh;
 
+        req->rq_level = LUSTRE_CONN_FULL;
         req->rq_replen = lustre_msg_size(0, NULL);
 
         rc = ptlrpc_queue_wait(req);
@@ -214,7 +219,7 @@ int mdc_readpage(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         body->size = offset;
 
         req->rq_replen = lustre_msg_size(1, size);
-
+        req->rq_level = LUSTRE_CONN_FULL;
         rc = ptlrpc_queue_wait(req);
         if (rc) {
                 CERROR("error in handling %d\n", rc);
@@ -323,7 +328,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
                 __u64 fh, ino;
                 copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
                 CERROR("-- opening ino %llu\n", (unsigned long long)ino);
-                err = mdc_open(&cl, conn, ino, S_IFDIR, O_RDONLY, &fh,
+                err = mdc_open(&cl, conn, ino, S_IFDIR, O_RDONLY, 4711, &fh, 
                                &request);
                 copy_to_user((__u64 *)arg, &fh, sizeof(fh));
                 CERROR("-- done err %d (fh=%Lu)\n", err,
index f7510bb..d4eef8a 100644 (file)
@@ -163,6 +163,7 @@ int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
                 CERROR("no memory for MDS client info\n");
                 RETURN(-ENOMEM);
         }
+        INIT_LIST_HEAD(&mci->mci_open_head);
 
         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
                cl_off, mcd->mcd_uuid);
@@ -296,7 +297,9 @@ int mds_connect(struct ptlrpc_request *req)
                 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
                        mcd->mcd_uuid, mci->mci_off);
         }
-        body->last_xid = HTON__u32(mcd->mcd_last_xid);
+        /* mcd_last_xid is is stored in little endian on the disk and 
+           mds_pack_rep_body converts it to network order */
+        body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
         mds_pack_rep_body(req);
         RETURN(0);
 }
@@ -349,10 +352,14 @@ int mds_open(struct ptlrpc_request *req)
         struct mds_body *body;
         struct file *file;
         struct vfsmount *mnt;
+        struct mds_client_info *mci;
         __u32 flags;
+        struct list_head *tmp;
+        struct mds_file_data *mfd;
         int rc, size = sizeof(*body);
         ENTRY;
 
+
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
                 CERROR("mds: out of memory\n");
@@ -360,19 +367,53 @@ int mds_open(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
+
+        mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
+        if (!mci) { 
+                CERROR("mds: no mci!\n");
+                req->rq_status = -ENOTCONN;
+                RETURN(0);
+        }
+
         body = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        /* was this animal open already? */
+        list_for_each(tmp, &mci->mci_open_head) { 
+                struct mds_file_data *fd;
+                fd = list_entry(tmp, struct mds_file_data, mfd_list);
+                if (body->objid == fd->mfd_clientfd && 
+                    body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) { 
+                        CERROR("Re opening %Ld\n", body->fid1.id);
+                        RETURN(0);
+                }
+        }
+
+        OBD_ALLOC(mfd, sizeof(*mfd));
+        if (!mfd) { 
+                CERROR("mds: out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+
         de = mds_fid2dentry(mds, &body->fid1, &mnt);
         if (IS_ERR(de)) {
                 req->rq_status = -ENOENT;
                 RETURN(0);
         }
+
         flags = body->flags;
         file = dentry_open(de, mnt, flags);
         if (!file || IS_ERR(file)) {
                 req->rq_status = -EINVAL;
+                OBD_FREE(mfd, sizeof(*mfd));
                 RETURN(0);
         }
 
+        file->private_data = mfd;
+        mfd->mfd_file = file;
+        mfd->mfd_clientfd = body->objid;
+        list_add(&mfd->mfd_list, &mci->mci_open_head); 
+
         body = lustre_msg_buf(req->rq_repmsg, 0);
         body->objid = (__u64) (unsigned long)file;
         RETURN(0);
@@ -384,6 +425,7 @@ int mds_close(struct ptlrpc_request *req)
         struct mds_body *body;
         struct file *file;
         struct vfsmount *mnt;
+        struct mds_file_data *mfd;
         int rc;
         ENTRY;
 
@@ -402,8 +444,13 @@ int mds_close(struct ptlrpc_request *req)
         }
 
         file = (struct file *)(unsigned long)body->objid;
-        req->rq_status = filp_close(file, 0);
+        if (!file->f_dentry) 
+                LBUG();
+        mfd = (struct mds_file_data *)file->private_data;
+        list_del(&mfd->mfd_list);
+        OBD_FREE(mfd, sizeof(*mfd));
 
+        req->rq_status = filp_close(file, 0);
         l_dput(de);
         mntput(mnt);
 
@@ -521,6 +568,7 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
                 CDEBUG(D_INODE, "reint\n");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
                 rc = mds_reint(req);
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
                 break;
 
         case MDS_OPEN:
@@ -549,9 +597,10 @@ out:
          */
         req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
         req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed);
-        CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu\n",
+        CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
                (unsigned long long)mds->mds_last_rcvd,
-               (unsigned long long)mds->mds_last_committed);
+               (unsigned long long)mds->mds_last_committed, 
+               cpu_to_le32(req->rq_reqmsg->xid));
         if (rc) {
                 ptlrpc_error(svc, req);
         } else {
index 79ed8a2..0140c56 100644 (file)
@@ -53,7 +53,7 @@ struct mds_client_info *mds_uuid_to_mci(struct mds_obd *mds, __u8 *uuid)
                              sizeof(mci->mci_mcd->mcd_uuid)))
                         return mci;
         }
-        CDEBUG(D_INFO, "no client UUID found for '%s'\n", uuid);
+        CDEBUG(D_INFO, "no mds client info found for  UUID '%s'\n", uuid);
         return NULL;
 }
 
@@ -135,6 +135,54 @@ out_setattr:
         return(0);
 }
 
+static int mds_reint_recreate(struct mds_update_record *rec,
+                            struct ptlrpc_request *req)
+{
+        struct dentry *de = NULL;
+        struct mds_obd *mds = &req->rq_obd->u.mds;
+        struct dentry *dchild = NULL;
+        struct inode *dir;
+        int rc = 0;
+        ENTRY;
+
+        de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+        if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) {
+                LBUG();
+                GOTO(out_create_de, rc = -ESTALE);
+        }
+        dir = de->d_inode;
+        CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
+
+        down(&dir->i_sem);
+        dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
+        if (IS_ERR(dchild)) {
+                CERROR("child lookup error %ld\n", PTR_ERR(dchild));
+                up(&dir->i_sem);
+                LBUG();
+                GOTO(out_create_dchild, rc = -ESTALE);
+        }
+
+        if (dchild->d_inode) {
+                struct mds_body *body;
+                rc = 0;
+                body = lustre_msg_buf(req->rq_repmsg, 0);
+                body->ino = dchild->d_inode->i_ino;
+                body->generation = dchild->d_inode->i_generation;
+        } else { 
+                CERROR("child doesn't exist (dir %ld, name %s)\n",
+                       dir->i_ino, rec->ur_name);
+                rc = -ENOENT;
+                LBUG();
+        }
+
+out_create_dchild:
+        l_dput(dchild);
+        up(&dir->i_sem);
+out_create_de:
+        l_dput(de);
+        req->rq_status = rc;
+        return 0;
+}
 static int mds_reint_create(struct mds_update_record *rec,
                             struct ptlrpc_request *req)
 {
@@ -152,7 +200,8 @@ static int mds_reint_create(struct mds_update_record *rec,
                 GOTO(out_create_de, rc = -ESTALE);
         }
         dir = de->d_inode;
-        CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
+        CDEBUG(D_INODE, "parent ino %ld name %s mode %o\n", 
+               dir->i_ino, rec->ur_name, rec->ur_mode);
 
         down(&dir->i_sem);
         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
@@ -478,6 +527,7 @@ static mds_reinter reinters[REINT_MAX+1] = {
         [REINT_UNLINK]    mds_reint_unlink,
         [REINT_LINK]      mds_reint_link,
         [REINT_RENAME]    mds_reint_rename,
+        [REINT_RECREATE]  mds_reint_recreate,
 };
 
 int mds_reint_rec(struct mds_update_record *rec, struct ptlrpc_request *req)
index 8f0b85c..70a6260 100644 (file)
@@ -643,6 +643,8 @@ static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
                            osc->osc_client);
         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
                            osc->osc_ldlm_client);
+        osc->osc_client->cli_name = "osc";
+        osc->osc_ldlm_client->cli_name = "ldlm";
 
         MOD_INC_USE_COUNT;
         RETURN(0);
index 9523a21..167e67d 100644 (file)
@@ -27,7 +27,7 @@
 #include <linux/lustre_ha.h>
 
 void ptlrpc_init_client(struct recovd_obd *recovd, 
-                        void (*recover)(struct ptlrpc_client *recover),
+                        int (*recover)(struct ptlrpc_client *recover),
                         int req_portal,
                         int rep_portal, struct ptlrpc_client *cl)
 {
@@ -39,10 +39,8 @@ void ptlrpc_init_client(struct recovd_obd *recovd,
         cl->cli_obd = NULL;
         cl->cli_request_portal = req_portal;
         cl->cli_reply_portal = rep_portal;
+        INIT_LIST_HEAD(&cl->cli_delayed_head);
         INIT_LIST_HEAD(&cl->cli_sending_head);
-        INIT_LIST_HEAD(&cl->cli_sent_head);
-        INIT_LIST_HEAD(&cl->cli_replied_head);
-        INIT_LIST_HEAD(&cl->cli_replay_head);
         INIT_LIST_HEAD(&cl->cli_dying_head);
         spin_lock_init(&cl->cli_lock);
         sema_init(&cl->cli_rpc_sem, 32);
@@ -72,6 +70,21 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
         return c;
 }
 
+void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, char *uuid)
+{
+        struct lustre_peer peer;
+        int err;
+
+        err = kportal_uuid_to_peer(uuid, &peer);
+        if (err != 0) {
+                CERROR("cannot find peer %s!\n", uuid);
+                return;
+        }
+        
+        memcpy(&conn->c_peer, &peer, sizeof(peer)); 
+        return;
+}
+
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
 {
         struct ptlrpc_bulk_desc *bulk;
@@ -135,6 +148,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
 
         spin_lock(&conn->c_lock);
         request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
+        request->rq_xid = conn->c_xid_out;
         spin_unlock(&conn->c_lock);
 
         request->rq_client = cl;
@@ -150,6 +164,7 @@ void ptlrpc_req_finished(struct ptlrpc_request *request)
         if (request->rq_repmsg != NULL) { 
                 OBD_FREE(request->rq_repmsg, request->rq_replen);
                 request->rq_repmsg = NULL;
+                request->rq_reply_md.start = NULL; 
         }
 
         if (atomic_dec_and_test(&request->rq_refcount))
@@ -168,7 +183,7 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
 
         if (request->rq_client) {
                 spin_lock(&request->rq_client->cli_lock);
-                list_del(&request->rq_list);
+                list_del_init(&request->rq_list);
                 spin_unlock(&request->rq_client->cli_lock);
         }
 
@@ -183,24 +198,35 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
 
         if (req->rq_repmsg != NULL) {
                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
-                req->rq_flags |= PTL_RPC_FL_REPLY;
+                req->rq_flags |= PTL_RPC_FL_REPLIED;
                 GOTO(out, rc = 1);
         }
 
         if (req->rq_flags & PTL_RPC_FL_RESEND) { 
                 CERROR("-- RESEND --\n");
-                req->rq_status = -EAGAIN;
                 GOTO(out, rc = 1);
         }
 
+        if (req->rq_flags & PTL_RPC_FL_RECOVERY) { 
+                CERROR("-- RESTART --\n");
+                GOTO(out, rc = 1);
+        }
+
+
         if (CURRENT_TIME - req->rq_time >= req->rq_timeout) {
                 CERROR("-- REQ TIMEOUT --\n");
                 /* clear the timeout */
                 req->rq_timeout = 0;
+                req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
                 req->rq_flags |= PTL_RPC_FL_TIMEOUT;
                 if (req->rq_client && req->rq_client->cli_recovd)
                         recovd_cli_fail(req->rq_client);
-                GOTO(out, rc = 0);
+                if (req->rq_level < LUSTRE_CONN_FULL)
+                        rc = -ETIMEDOUT;
+                else 
+                        rc = 0;
+
+                GOTO(out, rc);
         }
 
         if (req->rq_timeout) { 
@@ -277,27 +303,32 @@ void ptlrpc_free_committed(struct ptlrpc_client *cli)
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
 
-        list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
+        list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
+                if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) { 
+                        CDEBUG(D_INFO, "Retaining request %Ld for replay\n",
+                               req->rq_xid);
+                        continue;
+                }
+                        
                 /* not yet committed */ 
-                if (req->rq_transno > cli->cli_last_committed)
+                if (!req->rq_transno ||
+                    req->rq_transno > cli->cli_last_committed)
                         break; 
 
-                /* retain for replay if flagged */
-                list_del(&req->rq_list);
-                if (req->rq_flags & PTL_RPC_FL_RETAIN) {
-                        list_add(&req->rq_list, &cli->cli_replay_head);
+                CDEBUG(D_INFO, "Marking request %Ld as committed ("
+                       "transno=%Lu, last_committed=%Lu\n", 
+                       req->rq_xid, req->rq_transno, 
+                       cli->cli_last_committed);
+                if (atomic_dec_and_test(&req->rq_refcount)) {
+                        /* we do this to prevent free_req deadlock */
+                        list_del_init(&req->rq_list); 
+                        req->rq_client = NULL;
+                        ptlrpc_free_req(req);
                 } else {
-                        CDEBUG(D_INFO, "Marking request %p as committed ("
-                               "transno=%Lu, last_committed=%Lu\n", req,
-                               req->rq_transno, cli->cli_last_committed);
-                        if (atomic_dec_and_test(&req->rq_refcount)) {
-                                /* we do this to prevent free_req deadlock */
-                                req->rq_client = NULL;
-                                ptlrpc_free_req(req);
-                        } else
-                                list_add(&req->rq_list, &cli->cli_dying_head);
+                        list_del_init(&req->rq_list);
+                        list_add(&req->rq_list, &cli->cli_dying_head);
                 }
         }
 
@@ -312,66 +343,107 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
         ENTRY;
 
         spin_lock(&cli->cli_lock);
-        list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                /* We do this to prevent ptlrpc_free_req from taking cli_lock */
-                CDEBUG(D_INFO, "Cleaning req %p from replied list.\n", req);
-                list_del(&req->rq_list);
-                req->rq_client = NULL;
-                ptlrpc_free_req(req); 
-        }
-        list_for_each_safe(tmp, saved, &cli->cli_sent_head) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                CDEBUG(D_INFO, "Cleaning req %p from sent list.\n", req);
-                list_del(&req->rq_list);
-                req->rq_client = NULL;
-                ptlrpc_free_req(req); 
-        }
-        list_for_each_safe(tmp, saved, &cli->cli_replay_head) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                CERROR("Request %p is on the replay list at cleanup!\n", req);
-                list_del(&req->rq_list);
-                req->rq_client = NULL;
-                ptlrpc_free_req(req); 
-        }
         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
-                list_del(&req->rq_list);
+                list_del_init(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
         }
         list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 CERROR("Request %p is on the dying list at cleanup!\n", req);
-                list_del(&req->rq_list);
+                list_del_init(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
         }
         spin_unlock(&cli->cli_lock);
+
         EXIT;
         return;
 }
 
+void ptlrpc_continue_req(struct ptlrpc_request *req)
+{
+        ENTRY;
+        CDEBUG(D_INODE, "continue delayed request %Ld opc %d\n", 
+               req->rq_xid, req->rq_reqmsg->opc); 
+        wake_up_interruptible(&req->rq_wait_for_rep); 
+        EXIT;
+}
+
+void ptlrpc_resend_req(struct ptlrpc_request *req)
+{
+        ENTRY;
+        CDEBUG(D_INODE, "resend request %Ld, opc %d\n", 
+               req->rq_xid, req->rq_reqmsg->opc);
+        req->rq_status = -EAGAIN;
+        req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_flags |= PTL_RPC_FL_RESEND;
+        req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
+        wake_up_interruptible(&req->rq_wait_for_rep);
+        EXIT;
+}
+
+void ptlrpc_restart_req(struct ptlrpc_request *req)
+{
+        ENTRY;
+        CDEBUG(D_INODE, "restart completed request %Ld, opc %d\n", 
+               req->rq_xid, req->rq_reqmsg->opc);
+        req->rq_status = -ERESTARTSYS;
+        req->rq_flags |= PTL_RPC_FL_RECOVERY;
+        req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
+        wake_up_interruptible(&req->rq_wait_for_rep);
+        EXIT;
+}
+
 int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
         int rc = 0;
+        struct ptlrpc_client *cli = req->rq_client;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
+        CERROR("subsys: %s req %Ld opc %d level %d, conn level %d\n", 
+               cli->cli_name, req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
+               req->rq_connection->c_level);
+
+        /* XXX probably both an import and connection level are needed */
+        if (req->rq_level > req->rq_connection->c_level) { 
+                CERROR("process %d waiting for recovery\n", current->pid);
+                spin_lock(&cli->cli_lock);
+                list_del_init(&req->rq_list);
+                list_add(&req->rq_list, cli->cli_delayed_head.prev); 
+                spin_unlock(&cli->cli_lock);
+                wait_event_interruptible
+                        (req->rq_wait_for_rep, 
+                         req->rq_level <= req->rq_connection->c_level);
+                spin_lock(&cli->cli_lock);
+                list_del_init(&req->rq_list);
+                spin_unlock(&cli->cli_lock);
+                CERROR("process %d resumed\n", current->pid);
+        }
  resend:
         req->rq_time = CURRENT_TIME;
         req->rq_timeout = 30;
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
+                if ( rc > 0 ) 
+                        rc = -rc;
                 ptlrpc_cleanup_request_buf(req);
-                up(&req->rq_client->cli_rpc_sem);
+                up(&cli->cli_rpc_sem);
                 RETURN(-rc);
         }
 
+        spin_lock(&cli->cli_lock);
+        list_del_init(&req->rq_list);
+        list_add(&req->rq_list, cli->cli_sending_head.prev);
+        spin_unlock(&cli->cli_lock);
+
         CDEBUG(D_OTHER, "-- sleeping\n");
-        wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
+        wait_event_interruptible(req->rq_wait_for_rep, 
+                                 ptlrpc_check_reply(req));
         CDEBUG(D_OTHER, "-- done\n");
 
         if (req->rq_flags & PTL_RPC_FL_RESEND) {
@@ -379,15 +451,67 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 goto resend;
         }
 
-        //ptlrpc_cleanup_request_buf(req);
-        up(&req->rq_client->cli_rpc_sem);
+        up(&cli->cli_rpc_sem);
         if (req->rq_flags & PTL_RPC_FL_INTR) {
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
                 GOTO(out, rc = -EINTR);
         }
 
-        if (! (req->rq_flags & PTL_RPC_FL_REPLY)) {
+        if (! (req->rq_flags & PTL_RPC_FL_REPLIED)) {
+                GOTO(out, rc = req->rq_status);
+        }
+
+        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+        if (rc) {
+                CERROR("unpack_rep failed: %d\n", rc);
+                GOTO(out, rc);
+        }
+        CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
+        if (req->rq_repmsg->status == 0)
+                CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
+                       req->rq_replen, req->rq_repmsg->status);
+
+        spin_lock(&cli->cli_lock);
+        cli->cli_last_rcvd = req->rq_repmsg->last_rcvd;
+        cli->cli_last_committed = req->rq_repmsg->last_committed;
+        ptlrpc_free_committed(cli); 
+        spin_unlock(&cli->cli_lock);
+
+        EXIT;
+ out:
+        return rc;
+}
+
+int ptlrpc_replay_req(struct ptlrpc_request *req)
+{
+        int rc = 0;
+        struct ptlrpc_client *cli = req->rq_client;
+        ENTRY;
+
+        init_waitqueue_head(&req->rq_wait_for_rep);
+        CERROR("req %Ld opc %d level %d, conn level %d\n", 
+               req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
+               req->rq_connection->c_level);
+
+        req->rq_time = CURRENT_TIME;
+        req->rq_timeout = 3;
+        rc = ptl_send_rpc(req);
+        if (rc) {
+                CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
+                ptlrpc_cleanup_request_buf(req);
+                up(&cli->cli_rpc_sem);
+                RETURN(-rc);
+        }
+
+        CDEBUG(D_OTHER, "-- sleeping\n");
+        wait_event_interruptible(req->rq_wait_for_rep, 
+                                 ptlrpc_check_reply(req));
+        CDEBUG(D_OTHER, "-- done\n");
+
+        up(&cli->cli_rpc_sem);
+
+        if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
                 CERROR("Unknown reason for wakeup\n");
                 /* XXX Phil - I end up here when I kill obdctl */
                 ptlrpc_abort(req);
@@ -399,22 +523,19 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 CERROR("unpack_rep failed: %d\n", rc);
                 GOTO(out, rc);
         }
+
         CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
         if (req->rq_repmsg->status == 0)
                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
                        req->rq_replen, req->rq_repmsg->status);
+        else {
+                CERROR("recovery failed: "); 
+                CERROR("req %Ld opc %d level %d, conn level %d\n", 
+                       req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
+                       req->rq_connection->c_level);
+                LBUG();
+        }
 
-        spin_lock(&req->rq_client->cli_lock);
-        /* add to the tail of the replied head */
-        list_del(&req->rq_list);
-        list_add(&req->rq_list, req->rq_client->cli_replied_head.prev); 
-
-        req->rq_client->cli_last_rcvd = req->rq_repmsg->last_rcvd;
-        req->rq_client->cli_last_committed = req->rq_repmsg->last_committed;
-        ptlrpc_free_committed(req->rq_client); 
-        spin_unlock(&req->rq_client->cli_lock);
-
-        EXIT;
  out:
-        return rc;
+        RETURN(rc);
 }
index c93c17d..5ca8be2 100644 (file)
@@ -59,6 +59,7 @@ int connmgr_connect(struct recovd_obd *recovd, struct ptlrpc_connection *conn)
         strncpy(body->conn_uuid, conn->c_local_uuid, sizeof(body->conn_uuid));
 
         req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_level = LUSTRE_CONN_NEW;
 
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
index 819e4f1..5c0930d 100644 (file)
@@ -35,17 +35,9 @@ static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL;
  */
 static int request_out_callback(ptl_event_t *ev, void *data)
 {
-        struct ptlrpc_request *req = ev->mem_desc.user_ptr;
-        struct ptlrpc_client *cl = req->rq_client;
-
         ENTRY;
 
-        if (ev->type == PTL_EVENT_SENT) {
-                spin_lock(&req->rq_client->cli_lock);
-                list_del(&req->rq_list);
-                list_add(&req->rq_list, &cl->cli_sent_head);
-                spin_unlock(&req->rq_client->cli_lock);
-        } else {
+        if (ev->type != PTL_EVENT_SENT) {
                 // XXX make sure we understand all events, including ACK's
                 CERROR("Unknown event %d\n", ev->type);
                 LBUG();
index e3386ec..755032d 100644 (file)
@@ -94,14 +94,14 @@ static int ptl_send_buf(struct ptlrpc_request *request,
         remote_id.nid = conn->c_peer.peer_nid;
         remote_id.pid = 0;
 
-        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
-               request->rq_req_md.length, portal, request->rq_reqmsg->xid);
+        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %Ld\n",
+               request->rq_req_md.length, portal, request->rq_xid);
 
         rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_reqmsg->xid,
                     0, 0);
         if (rc != PTL_OK) {
-                CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
-                       portal, request->rq_reqmsg->xid, rc);
+                CERROR("PtlPut(%d, %d, %Ld) failed: %d\n", remote_id.nid,
+                       portal, request->rq_xid, rc);
                 PtlMDUnlink(md_h);
         }
 
@@ -228,14 +228,6 @@ int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
         RETURN(rc);
 }
 
-void ptlrpc_resend_req(struct ptlrpc_request *req)
-{
-        ENTRY;
-        req->rq_flags |= PTL_RPC_FL_RESEND;
-        req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
-        wake_up_interruptible(&req->rq_wait_for_rep);
-        EXIT;
-}
 
 int ptl_send_rpc(struct ptlrpc_request *request)
 {
@@ -249,18 +241,21 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                 CERROR("wrong packet type sent %d\n",
                        NTOH__u32(request->rq_reqmsg->type));
                 LBUG();
-                RETURN(-EINVAL);
+                RETURN(EINVAL);
         }
         if (request->rq_replen == 0) {
                 CERROR("request->rq_replen is 0!\n");
-                RETURN(-EINVAL);
+                RETURN(EINVAL);
         }
 
         /* request->rq_repmsg is set only when the reply comes in, in
          * client_packet_callback() */
+        if (request->rq_reply_md.start)
+                OBD_FREE(request->rq_reply_md.start, request->rq_replen);
+
         OBD_ALLOC(repbuf, request->rq_replen);
         if (!repbuf)
-                RETURN(-ENOMEM);
+                RETURN(ENOMEM);
 
         local_id.nid = PTL_ID_ANY;
         local_id.pid = PTL_ID_ANY;
@@ -293,14 +288,10 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                 GOTO(cleanup2, rc);
         }
 
-        CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
-               request->rq_replen, request->rq_reqmsg->xid,
+        CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %Lu, portal %u\n",
+               request->rq_replen, request->rq_xid,
                request->rq_client->cli_reply_portal);
 
-        spin_lock(&request->rq_client->cli_lock);
-        list_add(&request->rq_list, &request->rq_client->cli_sending_head);
-        spin_unlock(&request->rq_client->cli_lock);
-
         rc = ptl_send_buf(request, request->rq_connection,
                           request->rq_client->cli_request_portal);
         RETURN(rc);
index b110462..3b377bc 100644 (file)
@@ -88,8 +88,8 @@ int lustre_unpack_msg(struct lustre_msg *m, int len)
         m->type = NTOH__u32(m->type);
         m->connid = NTOH__u32(m->connid);
         m->bufcount = NTOH__u32(m->bufcount);
-        m->last_rcvd = NTOH__u32(m->last_rcvd);
-        m->last_committed = NTOH__u32(m->last_committed);
+        m->last_rcvd = NTOH__u64(m->last_rcvd);
+        m->last_committed = NTOH__u64(m->last_committed);
 
         required_len += m->bufcount * sizeof(__u32);
         if (len < required_len)
index 7e8e9c4..a3370da 100644 (file)
@@ -128,7 +128,6 @@ static int recovd_handle_event(struct recovd_obd *recovd)
 
         if (recovd->recovd_flags & RECOVD_UPCALL_ANSWER) { 
                 CERROR("UPCALL_WAITING: upcall answer\n");
-                CERROR("** fill me in with recovery\n");
 
                 while (!list_empty(&recovd->recovd_troubled_lh)) {
                         struct ptlrpc_client *cli =
index d21a480..862f6dc 100644 (file)
@@ -57,6 +57,7 @@ int connmgr_setup(struct obd_device *obddev, obd_count len, void *buf)
 
         ptlrpc_init_client(NULL, NULL, CONNMGR_REQUEST_PORTAL, 
                            CONNMGR_REPLY_PORTAL, recovd->recovd_client);
+        recovd->recovd_client->cli_name = "connmgr"; 
 
         err = ptlrpc_start_thread(obddev, recovd->recovd_service, "lustre_connmgr");
         if (err) {
index 16335a7..8bf5424 100644 (file)
@@ -3,12 +3,15 @@ CPPFLAGS := -I. -I$(PORTALS)/include -I$(top_srcdir)/include -D_LARGEFILE64_SOUR
 CFLAGS := -g -Wall
 # LDADD = -lldap
 # LDADD := -lreadline -ltermcap # -lefence
-bin_PROGRAMS = openunlink testreq truncate #ldaptest 
+bin_PROGRAMS = openunlink testreq truncate directio openme writeme #ldaptest 
 
 # ldaptest_SOURCES = ldaptest.c
 testreq_SOURCES = testreq.c
 truncate_SOURCES = truncate.c
+directio_SOURCES = directio.c
 openunlink_SOURCES = openunlink.c
+openme_SOURCES = openme.c
+writeme_SOURCES = writeme.c
 
 include $(top_srcdir)/Rules
 
index a4b46a5..cc36136 100755 (executable)
 #!/bin/sh
+SRCDIR=.
 
-echo `date` creating /mnt/lustre/foo
+. common.sh
+
+reconnect () { 
+
+$OBDCTL <<EOF
+name2dev RPCDEV
+newconn
+quit
+EOF
+
+}
+
+echo 
+echo "Test 5 reopen a file:" `date` "creating and writing/mnt/lustre/foo"
+echo
+rm -rf /mnt/lustre/*
+./openme /mnt/lustre/foo3 & 
+./writeme /mnt/lustre/iogoeson & 
+sleep 1
+ls -l /mnt/lustre
+echo 0x80000107 > /proc/sys/lustre/fail_loc
+mknod /mnt/lustre/dev c 10 240 &
+echo "MDS dropped create request -- sleep 4 secs - watch for timeout"
+sleep 4
+reconnect
+sleep 1
+echo "did things recover? check for file foo, bar, check log for reopen."
+ls -l /mnt/lustre
+echo "Test 5 done"
+
+exit
+
+echo 
+echo "Test 1 drop request:" `date` "creating /mnt/lustre/foo"
+echo
+rm -rf /mnt/lustre/*
 echo 0x80000107 > /proc/sys/lustre/fail_loc
 touch /mnt/lustre/foo &
 ps axww | grep touch
-echo "touch program suspended and hanging -- sleeping 5 secs"
-sleep 5
-ls -l /mnt/lustre/foo
+echo "MDS dropped create request -- sleep 4 secs - watch for timeout"
+sleep 4
+reconnect
+sleep 1
+echo "did things recover? check for file foo."
+ls -l /mnt/lustre
+
 
+echo
+echo "Test 2 test delay queue:" `date` "creating /mnt/lustre/foo"
+echo
+rm -rf /mnt/lustre/*
+mkdir /mnt/lustre/a
+echo 0x80000107 > /proc/sys/lustre/fail_loc
+touch /mnt/lustre/foo &
+ps axww | grep touch
+echo "MDS dropped create request -- sleep 4 secs - watch for timeout"
+sleep 4
+touch /mnt/lustre/a/f &
+reconnect
+sleep 1
+echo "did things recover? check for file foo and a/f"
+ls -l /mnt/lustre
+ls -l /mnt/lustre/a
+
+echo
+echo "Test 4 dropped reply:" `date` "creating /mnt/lustre/foo2"
+echo
+rm -rf /mnt/lustre/*
+echo 0x80000119 > /proc/sys/lustre/fail_loc
+touch /mnt/lustre/foo2 &
+ps axww | grep touch
+echo "MDS dropped create request -- sleep 4 secs - watch for timeout"
+sleep 4
+reconnect
+echo failure cleared
+sleep 1
+echo "did things recover? check for file foo2"
+ls -l /mnt/lustre
+
+
+
+exit
+
+echo
+echo "Test 3: Multiple failures"
+echo
 echo 0x0000107 > /proc/sys/lustre/fail_loc
 touch /mnt/lustre/bar &
 ps axww | grep touch
 echo "touch program will have repeated failures sleeping 10"
-
 sleep 10
-
 echo 0 > /proc/sys/lustre/fail_loc
+reconnect
+sleep 1
 echo "failure cleared"
-sleep 5
+echo "did things recover? Check for file bar"
 ls -l /mnt/lustre/bar
+
+
index 832b274..0f737f5 100755 (executable)
@@ -1,13 +1,5 @@
 #!/bin/sh
 
-echo -n `date` >> /tmp/halog
-echo "- please supply a new mds" >> /tmp/halog
+echo primary `date` >> /tmp/halog
 
-echo "- suppose we have a new one" >> /tmp/halog
-sleep 1
-
-/usr/src/obd/utils/obdctl  <<EOF
-name2dev RPCDEV
-newconn
-EOF