Whamcloud - gitweb
More work on the logging code.
authoradilger <adilger>
Thu, 24 Apr 2003 08:06:39 +0000 (08:06 +0000)
committeradilger <adilger>
Thu, 24 Apr 2003 08:06:39 +0000 (08:06 +0000)
Move "desc_private" journal handle into obd_trans_info (already passed to
many of the OBD API methods) so that we can do nested transactions when
calling OBD API methods.

lustre/include/linux/lustre_log.h
lustre/lib/recov_log.c

index 49a2d17..fb24084 100644 (file)
@@ -53,17 +53,16 @@ struct llog_logid {
 #define LLOG_HEADER_SIZE        (4096)     /* <= PAGE_SIZE */
 #define LLOG_HDR_RSVD_U32       (16)
 #define LLOG_HDR_DATA_SIZE      (LLOG_HDR_RSVD_U32 * sizeof(__u32))
-#define LLOG_BITMAP_SIZE        (LLOG_HEADER_SIZE - LLOG_HDR_DATA_SIZE)
+#define LLOG_BITMAP_BYTES       (LLOG_HEADER_SIZE - LLOG_HDR_DATA_SIZE)
 
-#define LLOG_LOGLIST_MAGIC      0x6d50e67d
-struct llog_catalog_header {
+#define LLOG_CATALOG_MAGIC      0x6d50e67d
+struct llog_catalog_hdr {
         __u32                   lch_size;
         __u32                   lch_magic;
         __u32                   lch_numrec;
         __u32                   lch_reserved[LLOG_HDR_RSVD_U32 - 4];
-        __u32                   lch_bitmap[LLOG_BITMAP_SIZE / sizeof(__u32)];
+        __u32                   lch_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)];
         __u32                   lch_size_end;
-        struct llog_logid       lch_logs[0];
 };
 
 
@@ -109,15 +108,13 @@ struct llog_object_hdr {
         /* This first chunk should be exactly 4096 bytes in size */
         __u32                   loh_size;
         __u32                   loh_magic;
-        __u32                   loh_numrec;
+        __u32                   loh_maxrec;
         __u32                   loh_reserved[LLOG_HDR_RSVD_U32 - 4];
-        __u32                   loh_bitmap[LLOG_BITMAP_SIZE / sizeof(__u32)];
+        __u32                   loh_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)];
         __u32                   loh_size_end;
-
-        struct llog_trans_rec   loh_records[0];
 };
 
-static inline llog_log_swabbed(struct llog_object_hdr *hdr)
+static inline int llog_log_swabbed(struct llog_object_hdr *hdr)
 {
         if (hdr->loh_magic == __swab32(LLOG_OBJECT_MAGIC))
                 return 1;
@@ -126,11 +123,12 @@ static inline llog_log_swabbed(struct llog_object_hdr *hdr)
         return -EINVAL;
 }
 
-/* In-memory descriptor for a log object */
+/* In-memory descriptor for a log object or log catalog */
 struct llog_handle {
         struct list_head        lgh_list;
         struct llog_logid       lgh_lid;
         struct brw_page         lgh_pga[2];
+        struct obdo            *lgh_oa;
         struct lov_stripe_md   *lgh_lsm;
 };
 
@@ -142,14 +140,18 @@ struct llog_cookie {
 };
 
 /* exported api prototypes */
-int llog_add_record(struct llog_handle **, void *recbuf, int reclen,
-                    struct llog_cookie *cookie);
-int llog_clear_records(int count, struct llog_cookie **cookies);
-int llog_clear_record(struct llog_handle *handle, __u32 recno);
-int llog_delete(struct llog_logid *id);
+extern int llog_add_record(struct lustre_handle *conn,
+                           struct llog_trans_hdr *rec,
+                           struct llog_cookie *logcookie,
+                           struct obd_trans_info *oti);
+extern int llog_clear_records(struct lustre_handle *conn, int count,
+                              struct llog_cookie *cookies);
+extern int llog_clear_record(struct lustre_handle *conn, __u32 recno);
+extern int llog_delete(struct lustre_handle *conn, struct llog_logid *id);
 
 /* internal api */
-int llog_id2handle(struct llog_logid *logid);
+extern struct llog_handle *llog_id2handle(struct lustre_handle *conn,
+                                          struct llog_logid *logid);
 
 #endif
 
index b06eb00..de95ee2 100644 (file)
 #include <linux/obd.h>
 #include <linux/lustre_log.h>
 
-/* Create a new log handle and add it to the open list.
- * This log handle will be closed when all of the records in it are removed.
- */
-static int llog_new_log(struct lustre_handle *conn, struct list_head *loglist,
-                        void *transhandle)
+/* Create a new log or catalog handle */
+static struct log_handle *llog_new_handle(struct lustre_handle *conn,
+                                          struct obd_trans_info *oti)
 {
-        struct obd_device *obd = class_conn2obd(conn);
         struct llog_handle *loghandle;
-        struct llog_object_hdr *loh;
-        struct obdo *oa;
-        void *addr;
+        int rc;
         ENTRY;
 
-        if (list_empty(loglist)) {
-                XXX do stuff to allocate log_catalog;
-        }
-
         OBD_ALLOC(loghandle, sizeof(*loghandle));
         if (loghandle == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         loghandle->lgh_pga[0].pg = alloc_page(GFP_KERNEL);
         if (loghandle->lgh_pga[0].pg == NULL)
                 GOTO(out_handle, rc = -ENOMEM);
         loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE;
 
-        loh = kmap(loghandle->lgh_pga[0].pg);
-        clear_page(loh);
-        loh->loh_size = loh->loh_size_end = LLOG_HEADER_SIZE;
-        loh->loh_magic = LLOG_OBJECT_MAGIC;
-        kunmap(loghandle->lgh_pga[0].pg);
-
         loghandle->lgh_pga[1].pg = alloc_page(GFP_KERNEL);
         if (loghandle->lgh_pga[1].pg == NULL)
                 GOTO(out_pga1, rc = -ENOMEM);
         loghandle->lgh_pga[0].off = LLOG_HEADER_SIZE;
 
-        obdo_alloc(oa);
-        rc = obd_create(conn, oa, &loghandle->lsm, NULL)
+        obdo_alloc(loghandle->lgh_oa);
+        if (!loghandle->lgh_oa)
+                GOTO(out_pga2, rc = -ENOMEM);
+
+        rc = obd_create(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti)
         if (rc) {
-                obdo_free(oa);
-                GOTO(out_pga2, rc);
+                CERROR("couldn't create new log object: rc %d\n", rc);
+                GOTO(out_oa, rc);
         }
 
-retry:
-        lch = kmap(obd->u.
-        index = ext2_find_first_zero_bit(lch->lch_bitmap, LLOG_BITMAP_SIZE * 8);
-        if (ext2_set_bit(index, lch->lch_bitmap)) {
-                CERROR("log catalog bit %u changed under us!\n", index);
-                goto retry;
+        rc = obd_open(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti, NULL);
+        if (rc) {
+                CERROR("couldn't open new log object "LPX64": rc %d\n",
+                       loghandle->lgh_oa->o_id, rc);
+                GOTO(out_destroy, rc);
         }
-        if (index > lch->lch_numrec
-        rc = obd_brw(OBD_BRW_WRITE, conn,
-        list_add_tail(&loghandle->lgh_list, loglist);
+        LIST_HEAD_INIT(&loghandle->lgh_list);
         loghandle->lgh_lid.lid_oid = oa->o_id;
         //loghandle->lgh_lid.lid_bootcount = ????;
 
+        RETURN(loghandle);
+
+out_destroy:
+        obd_destroy(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti);
+out_oa:
+        obd_free(loghandle->lgh_oa);
 out_pga2:
         __free_page(loghandle->lgh_pga[1].pg);
 out_pga1:
         __free_page(loghandle->lgh_pga[0].pg);
+out:
+        RETURN(ERR_PTR(rc));
+}
+
+/* Create a new log handle and add it to the open list.
+ * This log handle will be closed when all of the records in it are removed.
+ */
+static struct llog_handle *llog_new_log(struct lustre_handle *conn,
+                                        struct obd_trans_info *oti)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct llog_handle *loghandle, *cathandle;
+        struct llog_object_hdr *loh;
+        struct llog_logid *lid;
+        int num_pga = 2;
+        ENTRY;
+
+        cathandle = obd->obd_catalog;
+        loghandle = llog_new_handle(conn, oti);
+        if (IS_ERR(loghandle))
+                RETURN(loghandle);
+
+        loh = kmap(loghandle->lgh_pga[0].pg);
+        clear_page(loh);
+        loh->loh_size = loh->loh_size_end = LLOG_HEADER_SIZE;
+        loh->loh_magic = LLOG_OBJECT_MAGIC;
+        kunmap(loghandle->lgh_pga[0].pg);
+
+        lch = kmap(cathandle->lgh_pga[0].pg);
+retry:
+        index = ext2_find_first_zero_bit(lch->lch_bitmap, LLOG_BITMAP_BYTES*8);
+        /* Not much we can do here - we already leaked a few thousandd logs */
+        LASSERT(index < LLOG_BITMAP_BYTES*8);
+
+        if (ext2_set_bit(index, lch->lch_bitmap)) {
+                CERROR("log catalog bit %u changed under us!!?\n", index);
+                goto retry;
+        }
+        if (index >= lch->lch_maxrec)
+                lch->lch_maxrec = index + 1;
+
+        offset = LLOG_HEADER_SIZE + index * sizeof(*loh->loh_lid);
+#if PAGE_SIZE > LLOG_HEADER_SIZE
+        if (offset + sizeof(*loh->loh_lid) < PAGE_SIZE) {
+                num_pga = 1;
+                lid = (void *)lch + offset;
+                *lid = loghandle->lgh_lid;
+                cathandle->lgh_pga[0].len = offset+sizeof(lch->lch_lids[index]);
+                kunmap(lch);
+        } else
+#endif
+        {
+                void *addr;
+
+#if PAGE_SIZE > LLOG_HEADER_SIZE
+                cathandle->lgh_pga[0].len = LLOG_HEADER_SIZE;
+#endif
+                kunmap(lch);
+
+                cathandle->lgh_pga[1].off = offset;
+                cathandle->lgh_pga[1].len = sizeof(*lid);
+                addr = kmap(cathandle->lgh_pga[1].pg);
+                lid = addr + (offset & ~PAGE_MASK);
+                *lid = loghandle->lgh_lid;
+                kunmap(cathandle->lgh_pga[1].pg);
+        }
+
+        rc = obd_brw(OBD_BRW_WRITE, conn, cathandle->lgh_lsm, num_pga,
+                     cathandle->lgh_pga, NULL, oti);
+        if (rc) {
+        list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list);
+
+        RETURN(0);
+
 out_handle:
         OBD_FREE(loghandle, sizeof(*loghandle));
 
         RETURN(rc);
 }
 
+int llog_init_catalog(struct lustre_handle *conn, struct obd_trans_info *oti)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct llog_handle *cathandle;
+        struct llog_catalog_hdr *lch;
+        ENTRY;
+
+        if (obd->obd_catalog != NULL)
+                RETURN(0);
+
+        cathandle = llog_new_handle(conn, oti);
+        if (IS_ERR(cathandle))
+                RETURN(ERR_PTR(cathandle));
+        obd->obd_catalog = cathandle;
+
+        lch = kmap(cathandle->lgh_pga[0].pg);
+        clear_page(lch);
+        lch->lch_size = lch->lch_size_end = LLOG_HEADER_SIZE;
+        lcg->lcg_magic = LLOG_CATALOG_MAGIC;
+        kunmap(cathandle->lgh_pga[0].pg);
+
+        RETURN(0);
+}
+
 /* We start a new log object here if needed, either because no log has been
  * started, or because the current log cannot fit the new record.
  */
-int llog_get_log(conn, struct list_head *loglist, int reclen, void *transhandle)
+int llog_get_log(struct lustre_handle *conn, int reclen,
+                 struct obd_trans_info *oti)
 {
+        struct obd_device *obd = class_conn2obd(conn);
+        struct list_head *loglist = &obd->obd_catalog->lgh_list;
+
         if (list_empty(loglist)) {
-                loghandle = llog_new_log(conn, loglist, transhandle);
+                loghandle = llog_new_log(conn, oti);
                 if (IS_ERR(loghandle))
                         RETURN(rc = PTR_ERR(loghandle));
         } else {
@@ -107,7 +200,7 @@ int llog_get_log(conn, struct list_head *loglist, int reclen, void *transhandle)
                 if (loghandle->lgh_pga[1].off + reclen >= LLOG_MAX_LOG_SIZE) {
                         __free_page(loghandle->lgh_pga[1].pg);
                         loghandle->lgh_pga[1].pg = NULL;
-                        loghandle = llog_new_log(conn, loglist, transhandle);
+                        loghandle = llog_new_log(conn, oti);
                         if (IS_ERR(loghandle))
                                 RETURN(rc = PTR_ERR(loghandle));
                 }
@@ -115,52 +208,57 @@ int llog_get_log(conn, struct list_head *loglist, int reclen, void *transhandle)
 }
 
 /* Add a single record to the recovery log.  */
-int llog_add_record(struct lustre_handle *conn, struct list_head *loglist,
-                    llog_trans_hdr *rec, struct llog_cookie *logcookie,
-                    void *transhandle)
+int llog_add_record(struct lustre_handle *conn, struct llog_trans_hdr *rec,
+                    struct llog_cookie *logcookie, struct obd_trans_info *oti)
 {
         struct llog_handle *loghandle;
         struct llog_object_hdr *loh;
         int reclen = rec->lgh_len;
+        int offset;
+        int index;
         int num_pga = 2;
         int rc;
         ENTRY;
 
-        loghandle = llog_get_log(conn, loglist, reclen, transhandle);
+        loghandle = llog_get_log(conn, reclen, transhandle);
+
+        offset = loghandle->lgh_pga[1].off;
+
+        loh = kmap(loghandle->lgc_pga[0].pg);
+        index = loh->loh_numrec++;
+        ext2_set_bit(index, loh->loh_bitmap);
 
 #if PAGE_SIZE > LLOG_HEADER_SIZE
         /* It is possible we are still writing in the first page */
-        if (loghandle->lgh_pga[1].off < PAGE_SIZE) {
-                memcpy(kmap(loghandle->lgh_pga[0]->page) +
-                       loghandle->lgh_pga[1].off, rec, reclen);
-                loghandle->lgh_pga[0].count = loghandle->lgh_pga[1].off+reclen;
+        if (offset < PAGE_SIZE) {
+                memcpy(loh + offset, rec, reclen);
+                loghandle->lgh_pga[0].count = offset + reclen;
+                kunmap(loghandle->lgh_pga[0]->pg);
                 num_pga = 1;
         } else
 #endif
         {
-                memcpy(kmap(loghandle->lgh_pga[1]->page) +
-                       loghandle->lgh_pga[1].off, rec, reclen);
 #if PAGE_SIZE > LLOG_HEADER_SIZE
                 loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE;
 #endif
+                kunmap(loghandle->lgh_pga[0]->pg);
+
+                memcpy(kmap(loghandle->lgh_pga[1]->pg) + (offset & ~PAGE_MASK),
+                       rec, reclen);
                 loghandle->lgh_pga[1].count = reclen;
+                kunmap(loghandle->lgh_pga[1]->pg);
         }
-        kunmap(loghandle->lgh_pga->page);
         rc = obd_brw(OBD_BRW_WRITE, conn, loghandle->lgh_lsm, num_pga,
-                     loghandle->lgh_pga, NULL, NULL);
-
+                     loghandle->lgh_pga, NULL, oti);
         if (rc)
                 RETURN(rc);
 
-        loh = kmap(logcookie->lgc_pga[0].pg);
-        logcookie->lgc_lid = loghandle->lgh_lid;
-        logcookie->lgc_offset = loghandle->lgh_pga[1].off;
-        logcookie->lgc_index = loh->loh_numrec++;
-        ext2_set_bit(logcookie->lgc_index, loh->loh_bitmap);
-        kunmap(logcookie->lgc_pga[0].pg);
-
         loghandle->lgh_pga[1].off += reclen;
 
+        logcookie->lgc_lid = loghandle->lgh_lid;
+        logcookie->lgc_index = index;
+        logcookie->lgc_offset = offset;
+
         RETURN(0);
 }