Whamcloud - gitweb
Send last_rcvd values around when talking to the MDS. The MDC gets the
authoradilger <adilger>
Sat, 27 Apr 2002 08:33:54 +0000 (08:33 +0000)
committeradilger <adilger>
Sat, 27 Apr 2002 08:33:54 +0000 (08:33 +0000)
last_{rcvd,committed,xid} values on mdc_connect, but doesn't yet do
anything with this new data except print it to the debug logs.

A select number of MDS operations get last_{rcvd,committed} values sent
in the reply (mds_body) - create, getattr, open.  It is not totally
clear to me how to add in the mds_body to an RPC reply if it doesn't
already exist, so there is a little more work to do there.

At connect and reint time, client "UUIDs" are looked up and handled
appropriately for new and existing clients.  Currently, since the
RPCs don't actually contain any UUID values, all updates go to UUID "",
which is enough for testing, and should "just work" when UUIDs appear.

lustre/include/linux/lustre_mds.h
lustre/include/linux/obd.h
lustre/lib/mds_updates.c
lustre/llite/super.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_reint.c
lustre/ptlrpc/recovd.c
lustre/ptlrpc/service.c

index 82b2beb..698eeb3 100644 (file)
@@ -80,7 +80,7 @@ struct mds_client_data {
         __u8 uuid_padding[3];   /* unused */
         __u64 mcd_last_rcvd;    /* last completed transaction ID */
         __u64 mcd_mount_count;  /* MDS incarnation number */
-        __u32 mcd_xid;          /* client RPC xid for the last transaction */
+        __u32 mcd_last_xid;     /* client RPC xid for the last transaction */
         __u8 padding[MDS_LR_SIZE - 60];
 };
 
@@ -88,11 +88,12 @@ struct mds_client_data {
 struct mds_client_info {
         struct list_head mci_list;
         struct mds_client_data *mci_mcd;
-        loff_t mci_off;
+        int mci_off;
 };
 
 /* mds/mds_reint.c  */
-int mds_reint_rec(struct mds_update_record *r, struct ptlrpc_request *req); 
+int mds_reint_rec(struct mds_update_record *r, struct ptlrpc_request *req);
+struct mds_client_info *mds_uuid_to_mci(struct mds_obd *mds, __u8 *uuid);
 
 /* lib/mds_updates.c */
 void mds_pack_req_body(struct ptlrpc_request *);
@@ -115,7 +116,8 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vf
 
 /* mdc/mdc_request.c */
 int mdc_connect(struct ptlrpc_client *, struct ptlrpc_connection *,
-                struct ll_fid *rootfid, struct ptlrpc_request **);
+                struct ll_fid *rootfid, __u64 *last_committed, __u64 *last_rcvd,
+                __u32 *last_xid, struct ptlrpc_request **);
 int mdc_getattr(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
                 int type, unsigned long valid, struct ptlrpc_request **);
 int mdc_setattr(struct ptlrpc_client *, struct ptlrpc_connection *,
index f48f608..b4e07c1 100644 (file)
@@ -84,7 +84,7 @@ struct mds_obd {
         __u64 mds_mount_count;
         struct ll_fid mds_rootfid;
         int mds_client_count;
-        struct mds_client_info *mds_client_info;
+        struct list_head mds_client_info;
         struct mds_server_data *mds_server_data;
 };
 
index 0285002..734d6e8 100644 (file)
@@ -65,6 +65,9 @@ static void mds_pack_body(struct mds_body *b)
         b->ino = HTON__u32(b->ino);
         b->nlink = HTON__u32(b->nlink);
         b->generation = HTON__u32(b->generation);
+        b->last_xid = HTON__u32(b->last_xid);
+        b->last_committed = HTON__u64(b->last_committed);
+        b->last_rcvd = HTON__u64(b->last_rcvd);
 }
 
 void mds_pack_req_body(struct ptlrpc_request *req) 
@@ -165,6 +168,9 @@ static void mds_unpack_body(struct mds_body *b)
         b->ino = NTOH__u32(b->ino);
         b->nlink = NTOH__u32(b->nlink);
         b->generation = NTOH__u32(b->generation);
+        b->last_xid = NTOH__u32(b->last_xid);
+        b->last_rcvd = NTOH__u64(b->last_rcvd);
+        b->last_committed = NTOH__u64(b->last_committed);
 }
 
 
index 63104aa..7ff3176 100644 (file)
@@ -84,6 +84,8 @@ static struct super_block * ll_read_super(struct super_block *sb,
         int devno;
         int err;
         struct ll_fid rootfid;
+        __u64 last_committed, last_rcvd;
+        __u32 last_xid;
         struct ptlrpc_request *request = NULL;
 
         ENTRY;
@@ -135,13 +137,15 @@ static struct super_block * ll_read_super(struct super_block *sb,
 
         sbi->ll_mds_conn->c_level = LUSTRE_CONN_FULL;
 
+        /* XXX: need to store the last_* values somewhere */
         err = mdc_connect(&sbi->ll_mds_client, sbi->ll_mds_conn,
-                          &rootfid, &request);
+                          &rootfid, &last_committed, &last_rcvd, &last_xid,
+                          &request);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
                 GOTO(out_disc, sb = NULL);
         }
-        CERROR("rootfid %Ld\n", rootfid.id);
+        CERROR("rootfid %ld\n", (unsigned long)rootfid.id);
         sbi->ll_rootino = rootfid.id;
 
         sb->s_maxbytes = 1ULL << 36;
index b292bee..029646f 100644 (file)
@@ -35,7 +35,8 @@
 extern int mds_queue_req(struct ptlrpc_request *);
 
 int mdc_connect(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
-                struct ll_fid *rootfid, struct ptlrpc_request **request)
+                struct ll_fid *rootfid, __u64 *last_committed, __u64 *last_rcvd,
+                __u32 *last_xid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
@@ -49,17 +50,24 @@ int mdc_connect(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         body = lustre_msg_buf(req->rq_reqmsg, 0);
         req->rq_replen = lustre_msg_size(1, &size);
 
-        mds_pack_req_body(req); 
+        mds_pack_req_body(req);
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
 
         if (!rc) {
                 mds_unpack_rep_body(req);
                 body = lustre_msg_buf(req->rq_repmsg, 0);
-                memcpy(rootfid, &body->fid1, sizeof(*rootfid)); 
-
-                CDEBUG(D_NET, "root ino: %Ld\n", rootfid->id);
-                
+                memcpy(rootfid, &body->fid1, sizeof(*rootfid));
+                *last_committed = body->last_committed;
+                *last_rcvd = body->last_rcvd;
+                *last_xid = body->last_xid;
+
+                CDEBUG(D_NET, "root ino=%ld, last_committed=%ld, last_rcvd=%ld,"
+                       " last_xid=%d\n",
+                       (unsigned long)rootfid->id,
+                       (unsigned long)body->last_committed,
+                       (unsigned long)body->last_rcvd,
+                       body->last_xid);
         }
 
         EXIT;
index e7ed9c3..4af1f85 100644 (file)
@@ -155,26 +155,176 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
         return result;
 }
 
+#define MDS_MAX_CLIENTS 1024
+#define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
+
+static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
+
+/* Add client data to the MDS.  The in-memory storage will be a hash at some
+ * point.  We use a bitmap to locate a free space in the last_rcvd file if
+ * cl_off is -1 (i.e. a new client).  Otherwise, we have just read the data
+ * from the last_rcvd file and we know its offset.
+ */
+int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
+{
+        struct mds_client_info *mci;
+
+        OBD_ALLOC(mci, sizeof(*mci));
+        if (!mci) {
+                CERROR("no memory for MDS client info\n");
+                RETURN(-ENOMEM);
+        }
+
+        CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
+               cl_off, mcd->mcd_uuid);
+
+        if (cl_off == -1) {
+                unsigned long *word;
+                int bit;
+
+        repeat:
+                word = last_rcvd_slots;
+                while(*word == ~0UL)
+                        ++word;
+                if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
+                        CERROR("no room in client MDS bitmap - fix code\n");
+                        return -ENOMEM;
+                }
+                bit = ffz(*word);
+                if (test_and_set_bit(bit, word)) {
+                        CERROR("found bit %d set for word %d - fix code\n",
+                               bit, word - last_rcvd_slots);
+                        goto repeat;
+                }
+                cl_off = word - last_rcvd_slots + bit;
+        } else {
+                unsigned long *word;
+                int bit;
+
+                word = last_rcvd_slots + cl_off / sizeof(unsigned long);
+                bit = cl_off % sizeof(unsigned long);
+
+                if (test_and_set_bit(bit, word)) {
+                        CERROR("bit %d already set in word %d - bad bad\n",
+                               bit, word - last_rcvd_slots);
+                        LBUG();
+                }
+        }
+
+        mci->mci_mcd = mcd;
+        mci->mci_off = cl_off;
+
+        /* For now we just put the clients in a list, not a hashed list */
+        list_add_tail(&mci->mci_list, &mds->mds_client_info);
+
+        mds->mds_client_count++;
+
+        return 0;
+}
+
+void mds_client_del(struct mds_obd *mds, struct mds_client_info *mci)
+{
+        unsigned long *word;
+        int bit;
+
+        word = last_rcvd_slots + mci->mci_off / sizeof(unsigned long);
+        bit = mci->mci_off % sizeof(unsigned long);
+
+        if (!test_and_clear_bit(bit, word)) {
+                CERROR("bit %d already clear in word %d - bad bad\n",
+                       bit, word - last_rcvd_slots);
+                LBUG();
+        }
+
+        --mds->mds_client_count;
+        list_del(&mci->mci_list);
+        OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
+        OBD_FREE(mci, sizeof (*mci));
+}
+
+int mds_client_free_all(struct mds_obd *mds)
+{
+        struct list_head *p, *n;
+
+        list_for_each_safe(p, n, &mds->mds_client_info) {
+                struct mds_client_info *mci;
+
+                mci = list_entry(p, struct mds_client_info, mci_list);
+                mds_client_del(mds, mci);
+        }
+
+        return 0;
+}
+
+int mds_server_free_data(struct mds_obd *mds)
+{
+        OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
+        mds->mds_server_data = NULL;
+
+        return 0;
+}
+
 int mds_connect(struct ptlrpc_request *req)
 {
         struct mds_body *body;
         struct mds_obd *mds = &req->rq_obd->u.mds;
+        struct mds_client_info *mci;
+        struct mds_client_data *mcd;
         int rc, size = sizeof(*body);
         ENTRY;
 
+        CDEBUG(D_INFO, "MDS connect from UUID '%s'\n", ptlrpc_req_to_uuid(req));
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
-                CERROR("mds: out of memory\n");
+                CERROR("mds: out of memory for message: size=%d\n", size);
                 req->rq_status = -ENOMEM;
                 RETURN(0);
         }
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        mds_unpack_req_body(req); 
+        mds_unpack_req_body(req);
         /* Anything we need to do here with the client's trans no or so? */
 
         body = lustre_msg_buf(req->rq_repmsg, 0);
-        memcpy(&body->fid1, &mds->mds_rootfid  , sizeof(body->fid1)); 
+        memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
+
+        mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
+        if (!mci) {
+                /* We don't have any old connection data for this client */
+                int rc;
+
+                CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
+                       ptlrpc_req_to_uuid(req));
+
+                OBD_ALLOC(mcd, sizeof(*mcd));
+                if (!mcd) {
+                        CERROR("mds: out of memory for client data\n");
+                        req->rq_status = -ENOMEM;
+                        RETURN(0);
+                }
+                rc = mds_client_add(mds, mcd, -1);
+                if (rc) {
+                        req->rq_status = rc;
+                        RETURN(0);
+                }
+        } else {
+                /* We have old connection data for this client... */
+                mcd = mci->mci_mcd;
+                CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
+                       mcd->mcd_uuid, mci->mci_off);
+        }
+        /* Still not 100% sure whether we should reply with the server
+         * last_rcvd or that of this client.  I'm not sure it even makes
+         * a difference on a per-client basis, because last_rcvd is global
+         * and we are not supposed to allow transactions while in recovery.
+         */
+        body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
+        body->last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
+        //body->last_rcvd = mds->mds_last_rcvd;
+        body->last_committed = mds->mds_last_committed;
+        CDEBUG(D_INFO, "last_rcvd %ld, last_committed %ld, last_xid %d\n",
+               (unsigned long)body->last_rcvd,
+               (unsigned long)body->last_committed, body->last_xid);
         mds_pack_rep_body(req);
         RETURN(0);
 }
@@ -215,6 +365,7 @@ int mds_getattr(struct ptlrpc_request *req)
         body->mode = inode->i_mode;
         body->nlink = inode->i_nlink;
         body->valid = ~0;
+        body->last_committed = mds->mds_last_committed;
         mds_fs_get_objid(mds, inode, &body->objid);
         l_dput(de);
         RETURN(0);
@@ -222,6 +373,7 @@ int mds_getattr(struct ptlrpc_request *req)
 
 int mds_open(struct ptlrpc_request *req)
 {
+        struct mds_obd *mds = &req->rq_obd->u.mds;
         struct dentry *de;
         struct mds_body *body;
         struct file *file;
@@ -238,7 +390,7 @@ int mds_open(struct ptlrpc_request *req)
         }
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
+        de = mds_fid2dentry(mds, &body->fid1, &mnt);
         if (IS_ERR(de)) {
                 req->rq_status = -ENOENT;
                 RETURN(0);
@@ -252,6 +404,7 @@ int mds_open(struct ptlrpc_request *req)
 
         body = lustre_msg_buf(req->rq_repmsg, 0);
         body->objid = (__u64) (unsigned long)file;
+        body->last_committed = mds->mds_last_committed;
         RETURN(0);
 }
 
@@ -430,61 +583,7 @@ out:
 /* This will be a hash table at some point. */
 int mds_init_client_data(struct mds_obd *mds)
 {
-        if (mds->mds_client_info)
-                LBUG();
-
-        OBD_ALLOC(mds->mds_client_info,
-                  MDS_CLIENT_SLOTS * sizeof(struct mds_client_info));
-
-        if (!mds->mds_client_info)
-                return -ENOMEM;
-
-        return 0;
-}
-
-/* Add client data to the MDS.  This will be a hash at some point. */
-int mds_add_client(struct mds_obd *mds, struct mds_client_data *mcd, loff_t off)
-{
-        int num = mds->mds_client_count;
-
-        if (num >= MDS_CLIENT_SLOTS) {
-                CERROR("too many clients for current MDS config - fix code\n");
-                return -ENOMEM;
-        }
-
-        /* For now we cop-out and just put the clients in a list */
-        mds->mds_client_info[num].mci_mcd = mcd;
-        mds->mds_client_info[num].mci_off = off; /* in last_rcvd on disk */
-
-        mds->mds_client_count++;
-
-        return 0;
-}
-
-int mds_free_client_data(struct mds_obd *mds)
-{
-        struct mds_client_info *mci = mds->mds_client_info;
-        int i;
-
-        if (!mci)
-                RETURN(0);
-
-        for (i = 0; i < mds->mds_client_count; i++, mci++) {
-                OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
-                mci->mci_mcd = NULL;
-        }
-
-        OBD_FREE(mds->mds_client_info, MDS_CLIENT_SLOTS * sizeof(*mci));
-        mds->mds_client_info = NULL;
-
-        return 0;
-}
-
-int mds_free_server_data(struct mds_obd *mds)
-{
-        OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
-        mds->mds_server_data = NULL;
-
+        INIT_LIST_HEAD(&mds->mds_client_info);
         return 0;
 }
 
@@ -495,7 +594,8 @@ int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
         struct mds_server_data *msd;
         struct mds_client_data *mcd = NULL;
         loff_t fsize = f->f_dentry->d_inode->i_size;
-        loff_t off = 0, cl_off;
+        loff_t off = 0;
+        int cl_off;
         __u64 last_rcvd = 0;
         __u64 last_mount;
         int rc = 0;
@@ -517,6 +617,11 @@ int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
                 GOTO(err_msd, rc);
         }
 
+        /*
+         * When we do a clean MDS shutdown, we save the last_rcvd into
+         * the header.  If we find clients with higher last_rcvd values
+         * then those clients may need recovery done.
+         */
         last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
         mds->mds_last_rcvd = last_rcvd;
         CDEBUG(D_INODE, "got %Ld for server last_rcvd value\n",
@@ -525,11 +630,11 @@ int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
         last_mount = le64_to_cpu(msd->msd_mount_count);
         mds->mds_mount_count = last_mount;
         CDEBUG(D_INODE, "got %Ld for server last_mount value\n",
-               (unsigned long long)last_rcvd);
+               (unsigned long long)last_mount);
 
-        for (off = cl_off = MDS_LR_CLIENT, rc = sizeof(*mcd);
+        for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
              off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
-             off = cl_off + MDS_LR_SIZE) {
+             off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
                 if (!mcd)
                         OBD_ALLOC(mcd, sizeof(*mcd));
                 if (!mcd)
@@ -537,8 +642,8 @@ int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
 
                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
                 if (rc != sizeof(*mcd)) {
-                        CERROR("error reading MDS %s offset %Ld: rc = %d\n",
-                               LAST_RCVD, (unsigned long long)cl_off, rc);
+                        CERROR("error reading MDS %s offset %d: rc = %d\n",
+                               LAST_RCVD, cl_off, rc);
                         if (rc > 0)
                                 rc = -EIO;
                         break;
@@ -549,29 +654,35 @@ int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
 
                 if (last_rcvd &&
                     last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
-                        rc = mds_add_client(mds, mcd, cl_off);
+                        rc = mds_client_add(mds, mcd, cl_off);
                         if (rc) {
                                 rc = 0;
                                 break;
                         }
                         mcd = NULL;
+                } else {
+                        CDEBUG(D_INFO,
+                               "client at offset %d with UUID '%s' ignored\n",
+                               cl_off, mcd->mcd_uuid);
                 }
 
                 if (last_rcvd > mds->mds_last_rcvd) {
                         CDEBUG(D_OTHER,
-                               "client at offset %Ld has last_rcvd = %Ld\n",
-                               (unsigned long long)cl_off,
-                               (unsigned long long)last_rcvd);
+                               "client at offset %d has last_rcvd = %Ld\n",
+                               cl_off, (unsigned long long)last_rcvd);
                         mds->mds_last_rcvd = last_rcvd;
                 }
         }
         CDEBUG(D_INODE, "got %Ld for highest last_rcvd value, %d clients\n",
                (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
 
+        /* After recovery, there can be no local uncommitted transactions */
+        mds->mds_last_committed = mds->mds_last_rcvd;
+
         return 0;
 
 err_msd:
-        mds_free_server_data(mds);
+        mds_server_free_data(mds);
         return rc;
 }
 
@@ -629,7 +740,7 @@ static int mds_prep(struct obd_device *obddev)
                 GOTO(err_pop, rc = -ENOENT);
         }
 
-        rc = mds_fs_journal_data(mds, f->f_dentry->d_inode, f);
+        rc = mds_fs_journal_data(mds, f);
         if (rc) {
                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
@@ -687,7 +798,7 @@ err_svc:
         rpc_unregister_service(mds->mds_service);
         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
 err_client:
-        mds_free_client_data(mds);
+        mds_client_free_all(mds);
 err_filp:
         if (filp_close(f, 0))
                 CERROR("can't close %s after error\n", LAST_RCVD);
@@ -697,28 +808,37 @@ err_pop:
         return rc;
 }
 
-/* Update the server data on disk. */
+/* Update the server data on disk.  This stores the new mount_count and
+ * also the last_rcvd value to disk.  If we don't have a clean shutdown,
+ * then the server last_rcvd value may be less than that of the clients.
+ * This will alert us that we may need to do client recovery.
+ */
 int mds_update_server_data(struct mds_obd *mds)
 {
         struct obd_run_ctxt saved;
         struct mds_server_data *msd = mds->mds_server_data;
+        struct file *filp = mds->mds_rcvd_filp;
         loff_t off = 0;
         int rc;
 
         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
 
-        CDEBUG(D_SUPER, "MDS mount_count is %Ld\n",
-               (unsigned long long)mds->mds_mount_count);
+        CDEBUG(D_SUPER, "MDS mount_count is %Ld, last_rcvd is %Ld\n",
+               (unsigned long long)mds->mds_mount_count,
+               (unsigned long long)mds->mds_last_rcvd);
         push_ctxt(&saved, &mds->mds_ctxt);
-        rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)msd, sizeof(*msd), &off);
-        pop_ctxt(&saved);
+        rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
         if (rc != sizeof(*msd)) {
                 CERROR("error writing MDS server data: rc = %d\n", rc);
                 if (rc > 0)
                         RETURN(-EIO);
                 RETURN(rc);
         }
+        rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
+        pop_ctxt(&saved);
+        if (rc)
+                CERROR("error flushing MDS server data: rc = %d\n", rc);
 
         return 0;
 }
@@ -830,9 +950,9 @@ static int mds_cleanup(struct obd_device * obddev)
         if (!mds->mds_sb)
                 RETURN(0);
 
-        mds_free_client_data(mds);
+        mds_client_free_all(mds);
         mds_update_server_data(mds);
-        mds_free_server_data(mds);
+        mds_server_free_data(mds);
 
         if (mds->mds_rcvd_filp) {
                 int rc = filp_close(mds->mds_rcvd_filp, 0);
index 09cfdc9..3255b7e 100644 (file)
 #include <linux/lustre_mds.h>
 #include <linux/obd_class.h>
 
+struct mds_client_info *mds_uuid_to_mci(struct mds_obd *mds, __u8 *uuid)
+{
+        struct list_head *p;
+
+        if (!uuid)
+                return NULL;
+
+        list_for_each(p, &mds->mds_client_info) {
+                struct mds_client_info *mci;
+
+                mci = list_entry(p, struct mds_client_info, mci_list);
+                CDEBUG(D_INFO, "checking client UUID '%s'\n",
+                       mci->mci_mcd->mcd_uuid);
+                if (!strncmp(mci->mci_mcd->mcd_uuid, uuid,
+                             sizeof(mci->mci_mcd->mcd_uuid)))
+                        return mci;
+        }
+        CDEBUG(D_INFO, "no client UUID found for '%s'\n", uuid);
+        return NULL;
+}
+
 int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
                          struct ptlrpc_request *req)
 {
         /* get from req->rq_connection-> or req->rq_client */
-        struct mds_client_info mci_data, *mci = &mci_data;
-        struct mds_client_data mcd_data, *mcd = &mcd_data;
+        struct mds_client_info *mci;
         loff_t off;
         int rc;
 
-        /* Just a placeholder until more gets committed */
-        memset(mcd, 0, sizeof(*mcd));
-        mci->mci_mcd = mcd;
-        mci->mci_off = off = MDS_LR_CLIENT;
+        mci = mds_uuid_to_mci(mds, req->rq_connection->c_remote_uuid);
+        if (!mci) {
+                CERROR("unable to locate MDS client data for UUID '%s'\n",
+                       ptlrpc_req_to_uuid(req));
+                /* This will be a real error once everything is working */
+                //LBUG();
+                RETURN(0);
+        }
+
+        off = MDS_LR_CLIENT + mci->mci_off * MDS_LR_SIZE;
 
         ++mds->mds_last_rcvd;   /* lock this, or make it an LDLM function? */
         mci->mci_mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
         mci->mci_mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
-        mci->mci_mcd->mcd_xid = cpu_to_le32(req->rq_connection->c_xid_in);
+        mci->mci_mcd->mcd_last_xid = cpu_to_le32(req->rq_reqmsg->xid);
 
         mds_fs_set_last_rcvd(mds, handle);
         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mci->mci_mcd,
                            sizeof(*mci->mci_mcd), &off);
-        CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at %Ld: rc = %d\n",
+        CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at #%d: rc = %d\n",
                mds->mds_last_rcvd, mci->mci_mcd->mcd_uuid, mci->mci_off, rc);
         // store new value and last committed value in req struct
 
@@ -95,6 +121,7 @@ static int mds_reint_setattr(struct mds_update_record *rec,
 
         if (!rc)
                 rc = mds_update_last_rcvd(mds, handle, req);
+        /* FIXME: need to return last_rcvd, last_committed */
 
         EXIT;
 
@@ -124,7 +151,7 @@ static int mds_reint_create(struct mds_update_record *rec,
                 GOTO(out_create_de, rc = -ESTALE);
         }
         dir = de->d_inode;
-        CDEBUG(D_INODE, "ino %ld\n", dir->i_ino);
+        CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
 
         down(&dir->i_sem);
         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
@@ -136,8 +163,8 @@ static int mds_reint_create(struct mds_update_record *rec,
         }
 
         if (dchild->d_inode) {
-                CERROR("child exists (dir %ld, name %s)\n",
-                       dir->i_ino, rec->ur_name);
+                CERROR("child exists (dir %ld, name %s, ino %ld)\n",
+                       dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
                 LBUG();
                 GOTO(out_create_dchild, rc = -EEXIST);
         }
@@ -195,6 +222,7 @@ static int mds_reint_create(struct mds_update_record *rec,
                 struct inode *inode = dchild->d_inode;
                 struct mds_body *body;
 
+                CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
                 if (type == S_IFREG) {
                         rc = mds_fs_set_objid(mds, inode, handle, rec->ur_id);
                         if (rc)
@@ -213,14 +241,16 @@ static int mds_reint_create(struct mds_update_record *rec,
                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
                 /* XXX should we abort here in case of error? */
 
+                //if (!rc)
+                rc = mds_update_last_rcvd(mds, handle, req);
+
                 body = lustre_msg_buf(req->rq_repmsg, 0);
                 body->ino = inode->i_ino;
                 body->generation = inode->i_generation;
+                body->last_rcvd = mds->mds_last_rcvd;
+                body->last_committed = mds->mds_last_committed;
         }
 
-        if (!rc)
-                rc = mds_update_last_rcvd(mds, handle, req);
-
 out_create_commit:
         /* FIXME: keep rc intact */
         rc = mds_fs_commit(mds, dir, handle);
@@ -250,7 +280,7 @@ static int mds_reint_unlink(struct mds_update_record *rec,
                 GOTO(out_unlink, rc = -ESTALE);
         }
         dir = de->d_inode;
-        CDEBUG(D_INODE, "ino %ld\n", dir->i_ino);
+        CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
 
         down(&dir->i_sem);
         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
@@ -300,6 +330,7 @@ static int mds_reint_unlink(struct mds_update_record *rec,
 
         if (!rc)
                 rc = mds_update_last_rcvd(mds, handle, req);
+        /* FIXME: need to return last_rcvd, last_committed */
         /* FIXME: keep rc intact */
         rc = mds_fs_commit(mds, dir, handle);
 
@@ -360,6 +391,7 @@ static int mds_reint_link(struct mds_update_record *rec,
         if (!rc)
                 rc = mds_update_last_rcvd(mds, handle, req);
 
+        /* FIXME: need to return last_rcvd, last_committed */
         /* FIXME: keep rc intact */
         rc = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
         EXIT;
@@ -421,6 +453,7 @@ static int mds_reint_rename(struct mds_update_record *rec,
         if (!rc)
                 rc = mds_update_last_rcvd(mds, handle, req);
 
+        /* FIXME: need to return last_rcvd, last_committed */
         /* FIXME: keep rc intact */
         rc = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
         EXIT;
index 579b122..4541cc2 100644 (file)
@@ -96,6 +96,7 @@ int connmgr_connect(struct recovd_obd *recovd, struct ptlrpc_connection *conn)
         body->generation = HTON__u32(conn->c_generation);
         body->conn = (__u64)(unsigned long)conn;
         body->conn_token = conn->c_token;
+        strncpy(body->conn_uuid, conn->c_local_uuid, sizeof(body->conn_uuid));
 
         req->rq_replen = lustre_msg_size(1, &size);
 
@@ -110,6 +111,8 @@ int connmgr_connect(struct recovd_obd *recovd, struct ptlrpc_connection *conn)
                 conn->c_level = LUSTRE_CONN_CON;
                 conn->c_remote_conn = body->conn;
                 conn->c_remote_token = body->conn_token;
+                strncpy(conn->c_remote_uuid, body->conn_uuid,
+                        sizeof(conn->c_remote_uuid));
         }
 
 out_free:
@@ -140,12 +143,16 @@ static int connmgr_handle_connect(struct ptlrpc_request *req)
 
         req->rq_connection->c_remote_conn = body->conn;
         req->rq_connection->c_remote_token = body->conn_token;
+        strncpy(req->rq_connection->c_remote_uuid, body->conn_uuid,
+                sizeof(req->rq_connection->c_remote_uuid));
 
         CERROR("incoming generation %d\n", body->generation);
         body = lustre_msg_buf(req->rq_repmsg, 0);
         body->generation = 4711;
         body->conn = (__u64)(unsigned long)req->rq_connection;
         body->conn_token = req->rq_connection->c_token;
+        strncpy(body->conn_uuid, req->rq_connection->c_local_uuid,
+                sizeof(body->conn_uuid));
 
         req->rq_connection->c_level = LUSTRE_CONN_CON;
         RETURN(0);
index c85db78..6754fb7 100644 (file)
@@ -97,7 +97,7 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
 
         err = kportal_uuid_to_peer(uuid, &service->srv_self);
         if (err) {
-                CERROR("cannot get peer for uuid %s", uuid);
+                CERROR("cannot get peer for uuid '%s'", uuid);
                 GOTO(err_free, NULL);
         }