Whamcloud - gitweb
Remove (false) distinction between llog_catalog_hdr and llog_object_hdr -
authoradilger <adilger>
Tue, 27 May 2003 09:08:49 +0000 (09:08 +0000)
committeradilger <adilger>
Tue, 27 May 2003 09:08:49 +0000 (09:08 +0000)
  they have to be the same for LOV-nested logfiles to work.
Change llog_current_log() to have an explicit pointer to the current log for
  that catalog, so we don't get confused by LOV-nested logfiles on the list.
Mark the LOV catalogs initialized so we don't create a new log each time.
Use the right message for the unlink cookies being sent to the OSTs.
Set the MDS unlink record type.

lustre/include/linux/lustre_log.h
lustre/lib/recov_log.c

index c2f7cfb..38da625 100644 (file)
@@ -50,6 +50,7 @@ struct llog_handle {
         void                   *lgh_hdr;
         struct file            *lgh_file;
         struct obd_uuid        *lgh_tgtuuid;
+        struct llog_handle     *lgh_current;
         struct llog_handle     *(*lgh_log_create)(struct obd_device *obd);
         struct llog_handle     *(*lgh_log_open)(struct obd_device *obd,
                                                 struct llog_cookie *logcookie);
index e0ab122..673acdf 100644 (file)
@@ -74,36 +74,35 @@ struct llog_handle *llog_new_log(struct llog_handle *cathandle,
                                  struct obd_uuid *tgtuuid)
 {
         struct llog_handle *loghandle;
-        struct llog_catalog_hdr *lch;
-        struct llog_object_hdr *loh;
+        struct llog_object_hdr *llh;
         loff_t offset;
         int rc, index, bitmap_size, i;
         ENTRY;
 
-        LASSERT(sizeof(*loh) == LLOG_CHUNK_SIZE);
+        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
 
         loghandle = cathandle->lgh_log_create(cathandle->lgh_obd);
         if (IS_ERR(loghandle))
-                GOTO(out, rc = PTR_ERR(loghandle));
-
-        loh = loghandle->lgh_hdr;
-        loh->loh_hdr.lth_type = LLOG_OBJECT_MAGIC;
-        loh->loh_hdr.lth_len = loh->loh_hdr_end_len = sizeof(*loh);
-        loh->loh_timestamp = CURRENT_TIME;
-        loh->loh_bitmap_offset = offsetof(struct llog_object_hdr, loh_bitmap);
-        memcpy(&loh->loh_tgtuuid, tgtuuid, sizeof(loh->loh_tgtuuid));
-        loghandle->lgh_tgtuuid = &loh->loh_tgtuuid;
-
-        lch = cathandle->lgh_hdr;
-        bitmap_size = sizeof(lch->lch_bitmap) * 8;
+                RETURN(loghandle);
+
+        llh = loghandle->lgh_hdr;
+        llh->llh_hdr.lth_type = LLOG_OBJECT_MAGIC;
+        llh->llh_hdr.lth_len = llh->llh_hdr_end_len = sizeof(*llh);
+        llh->llh_timestamp = CURRENT_TIME;
+        llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+        memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
+        loghandle->lgh_tgtuuid = &llh->llh_tgtuuid;
+
+        llh = cathandle->lgh_hdr;
+        bitmap_size = sizeof(llh->llh_bitmap) * 8;
         /* This should basically always find the first entry free */
-        for (i = 0, index = lch->lch_index; i < bitmap_size; i++, index++) {
+        for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) {
                 index %= bitmap_size;
-                if (ext2_set_bit(index, lch->lch_bitmap))
+                if (ext2_set_bit(index, llh->llh_bitmap))
                         /* XXX This should trigger log clean up or similar */
                         CERROR("catalog index %d is still in use\n", index);
                 else {
-                        lch->lch_index = (index + 1) % bitmap_size;
+                        llh->llh_count = (index + 1) % bitmap_size;
                         break;
                 }
         }
@@ -115,11 +114,12 @@ struct llog_handle *llog_new_log(struct llog_handle *cathandle,
                loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index);
         loghandle->lgh_cookie.lgc_index = index;
 
-        offset = sizeof(*lch) + index * sizeof(loghandle->lgh_cookie);
+        offset = sizeof(*llh) + index * sizeof(loghandle->lgh_cookie);
 
         /* XXX Hmm, what to do if the catalog update fails?  Under normal
          *     operations we would clean this handle up anyways, and at
-         *     worst we leak some objects.
+         *     worst we leak some objects, but there is little point in
+         *     doing the logging in that case...
          *
          *     We don't want to mark a catalog in-use if it wasn't written.
          *     The only danger is if the OST crashes - the log is lost.
@@ -132,57 +132,50 @@ struct llog_handle *llog_new_log(struct llog_handle *cathandle,
                 rc = rc < 0 ? : -ENOSPC;
         } else {
                 offset = 0;
-                rc = lustre_fwrite(cathandle->lgh_file, lch, sizeof(*lch),
+                rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
                                    &offset);
-                if (rc != sizeof(*lch)) {
+                if (rc != sizeof(*llh)) {
                         CERROR("error marking catalog entry %d in use: rc %d\n",
                                index, rc);
                         rc = rc < 0 ? : -ENOSPC;
                 }
         }
-        loghandle->lgh_log_create = cathandle->lgh_log_create;
-        loghandle->lgh_log_open = cathandle->lgh_log_open;
-        loghandle->lgh_log_close = cathandle->lgh_log_close;
+        cathandle->lgh_current = loghandle;
         list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list);
 
         RETURN(loghandle);
-
-        llog_free_handle(loghandle);
-out:
-        RETURN(ERR_PTR(rc));
 }
 
 /* Assumes caller has already pushed us into the kernel context. */
 int llog_init_catalog(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
 {
-        struct llog_catalog_hdr *lch;
+        struct llog_object_hdr *llh;
         loff_t offset = 0;
         int rc = 0;
         ENTRY;
 
-        LASSERT(sizeof(*lch) == LLOG_CHUNK_SIZE);
+        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
 
         down(&cathandle->lgh_lock);
-        lch = cathandle->lgh_hdr;
+        llh = cathandle->lgh_hdr;
 
         if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
-write_hdr:      lch->lch_hdr.lth_type = LLOG_CATALOG_MAGIC;
-                lch->lch_hdr.lth_len = lch->lch_hdr_end_len = LLOG_CHUNK_SIZE;
-                lch->lch_timestamp = CURRENT_TIME;
-                lch->lch_bitmap_offset = offsetof(struct llog_catalog_hdr,
-                                                  lch_bitmap);
-                memcpy(&lch->lch_tgtuuid, tgtuuid, sizeof(lch->lch_tgtuuid));
-                rc = lustre_fwrite(cathandle->lgh_file, lch, LLOG_CHUNK_SIZE,
+write_hdr:      llh->llh_hdr.lth_type = LLOG_CATALOG_MAGIC;
+                llh->llh_hdr.lth_len = llh->llh_hdr_end_len = LLOG_CHUNK_SIZE;
+                llh->llh_timestamp = CURRENT_TIME;
+                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+                memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid));
+                rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
                                    &offset);
                 if (rc != LLOG_CHUNK_SIZE) {
                         CERROR("error writing catalog header: rc %d\n", rc);
-                        OBD_FREE(lch, sizeof(*lch));
+                        OBD_FREE(llh, sizeof(*llh));
                         if (rc >= 0)
                                 rc = -ENOSPC;
                 } else
                         rc = 0;
         } else {
-                rc = lustre_fread(cathandle->lgh_file, lch, LLOG_CHUNK_SIZE,
+                rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
                                   &offset);
                 if (rc != LLOG_CHUNK_SIZE) {
                         CERROR("error reading catalog header: rc %d\n", rc);
@@ -192,7 +185,7 @@ write_hdr:      lch->lch_hdr.lth_type = LLOG_CATALOG_MAGIC;
                         rc = 0;
         }
 
-        cathandle->lgh_tgtuuid = &lch->lch_tgtuuid;
+        cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
         up(&cathandle->lgh_lock);
         RETURN(rc);
 }
@@ -207,17 +200,13 @@ write_hdr:      lch->lch_hdr.lth_type = LLOG_CATALOG_MAGIC;
  */
 struct llog_handle *llog_current_log(struct llog_handle *cathandle, int reclen)
 {
-        struct list_head *loglist = &cathandle->lgh_list;
         struct llog_handle *loghandle = NULL;
         ENTRY;
 
-        if (!list_empty(loglist)) {
-                struct llog_object_hdr *loh;
-
-                loghandle = list_entry(loglist->prev, struct llog_handle,
-                                       lgh_list);
-                loh = loghandle->lgh_hdr;
-                if (loh->loh_numrec < sizeof(loh->loh_bitmap) * 8)
+        loghandle = cathandle->lgh_current;
+        if (loghandle) {
+                struct llog_object_hdr *llh = loghandle->lgh_hdr;
+                if (llh->llh_count < sizeof(llh->llh_bitmap) * 8)
                         GOTO(out, loghandle);
         }
 
@@ -238,7 +227,7 @@ int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
                     struct llog_cookie *logcookies)
 {
         struct llog_handle *loghandle;
-        struct llog_object_hdr *loh;
+        struct llog_object_hdr *llh;
         int reclen = rec->lth_len;
         struct file *file;
         loff_t offset;
@@ -257,7 +246,7 @@ int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
         down(&loghandle->lgh_lock);
         up(&cathandle->lgh_lock);
 
-        loh = loghandle->lgh_hdr;
+        llh = loghandle->lgh_hdr;
         file = loghandle->lgh_file;
 
         /* Make sure that records don't cross a chunk boundary, so we can
@@ -304,15 +293,15 @@ int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
         }
 
         index = loghandle->lgh_index++;
-        if (ext2_set_bit(index, loh->loh_bitmap)) {
+        if (ext2_set_bit(index, llh->llh_bitmap)) {
                 CERROR("argh, index %u already set in log bitmap?\n", index);
                 LBUG(); /* should never happen */
         }
-        loh->loh_numrec++;
+        llh->llh_count++;
 
         offset = 0;
-        rc = lustre_fwrite(loghandle->lgh_file, loh, sizeof(*loh), &offset);
-        if (rc != sizeof(*loh)) {
+        rc = lustre_fwrite(loghandle->lgh_file, llh, sizeof(*llh), &offset);
+        if (rc != sizeof(*llh)) {
                 CERROR("error writing log header: rc %d\n", rc);
                 GOTO(out, rc < 0 ? rc : -EIO);
         }
@@ -324,7 +313,7 @@ int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec,
                 GOTO(out, rc < 0 ? rc : -ENOSPC);
         }
 
-        CDEBUG(D_HA, "added cookie "LPX64":%x+%u, %u bytes\n",
+        CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n",
                loghandle->lgh_cookie.lgc_lgl.lgl_oid,
                loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lth_len);
         *logcookies = loghandle->lgh_cookie;
@@ -343,7 +332,7 @@ int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle)
 {
         struct llog_cookie *lgc = &loghandle->lgh_cookie;
         int catindex = lgc->lgc_index;
-        struct llog_catalog_hdr *lch = cathandle->lgh_hdr;
+        struct llog_object_hdr *llh = cathandle->lgh_hdr;
         loff_t offset = 0;
         int rc = 0;
         ENTRY;
@@ -351,13 +340,14 @@ int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle)
         CDEBUG(D_HA, "log "LPX64":%x empty, closing\n",
                lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen);
 
-        if (ext2_clear_bit(catindex, lch->lch_bitmap)) {
+        if (ext2_clear_bit(catindex, llh->llh_bitmap)) {
                 CERROR("catalog index %u already clear?\n", catindex);
+                LBUG();
         } else {
-                rc = lustre_fwrite(cathandle->lgh_file, lch, sizeof(*lch),
+                rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh),
                                    &offset);
 
-                if (rc != sizeof(*lch)) {
+                if (rc != sizeof(*llh)) {
                         CERROR("log %u cancel error: rc %d\n", catindex, rc);
                         if (rc >= 0)
                                 rc = -EIO;
@@ -424,7 +414,7 @@ int llog_cancel_records(struct llog_handle *cathandle, int count,
         down(&cathandle->lgh_lock);
         for (i = 0; i < count; i++, cookies++) {
                 struct llog_handle *loghandle;
-                struct llog_object_hdr *loh;
+                struct llog_object_hdr *llh;
                 struct llog_logid *lgl = &cookies->lgc_lgl;
 
                 loghandle = llog_id2handle(cathandle, cookies);
@@ -435,22 +425,22 @@ int llog_cancel_records(struct llog_handle *cathandle, int count,
                 }
 
                 down(&loghandle->lgh_lock);
-                loh = loghandle->lgh_hdr;
+                llh = loghandle->lgh_hdr;
                 CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n",
                        lgl->lgl_oid, cookies->lgc_index,
-                        ext2_test_bit(cookies->lgc_index, loh->loh_bitmap));
-                if (!ext2_clear_bit(cookies->lgc_index, loh->loh_bitmap)) {
+                        ext2_test_bit(cookies->lgc_index, llh->llh_bitmap));
+                if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) {
                         CERROR("log index %u in "LPX64":%x already clear?\n",
                                cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen);
-                } else if (--loh->loh_numrec == 0 &&
+                } else if (--llh->llh_count == 0 &&
                            loghandle != llog_current_log(cathandle, 0)) {
                         loghandle->lgh_log_close(cathandle, loghandle);
                 } else {
                         loff_t offset = 0;
-                        int ret = lustre_fwrite(loghandle->lgh_file, loh,
-                                                sizeof(*loh), &offset);
+                        int ret = lustre_fwrite(loghandle->lgh_file, llh,
+                                                sizeof(*llh), &offset);
 
-                        if (ret != sizeof(*loh)) {
+                        if (ret != sizeof(*llh)) {
                                 CERROR("error cancelling index %u: rc %d\n",
                                        cookies->lgc_index, ret);
                                 /* XXX mark handle bad? */