Whamcloud - gitweb
Let's even be specific about the fact that it's the script library that we
[fs/lustre-release.git] / lustre / lvfs / llog_cat.c
index 67a6f80..deadb3d 100644 (file)
 #endif
 
 #include <linux/lustre_log.h>
-#include <portals/list.h>
+#include <libcfs/list.h>
 
 /* Create a new log handle and add it to the open list.
  * This log handle will be closed when all of the records in it are removed.
  *
  * Assumes caller has already pushed us into the kernel context and is locking.
  */
-static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
+static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
+                                            struct llog_cookie *logcookie)
 {
         struct llog_handle *loghandle;
         struct llog_log_hdr *llh;
@@ -55,7 +56,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
         ENTRY;
 
         llh = cathandle->lgh_hdr;
-        bitmap_size = sizeof(llh->llh_bitmap) * 8;
+        bitmap_size = LLOG_BITMAP_SIZE(llh);
 
         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
 
@@ -76,9 +77,16 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
                 llh->llh_tail.lrt_index = cpu_to_le32(index);
         }
 
-        rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
-        if (rc)
+        if (logcookie && llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
+                rc = llog_open(cathandle->lgh_ctxt, &loghandle,
+                               &logcookie->lgc_lgl, NULL, OBD_LLOG_FL_CREATE);
+        else
+                rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
+                               OBD_LLOG_FL_CREATE);
+        if (rc) {
+                CERROR("cannot create new log, error = %d\n", rc);
                 RETURN(ERR_PTR(rc));
+        }
 
         rc = llog_init_handle(loghandle,
                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
@@ -100,9 +108,8 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
         /* update the catalog: header and record */
         rc = llog_write_rec(cathandle, &rec.lid_hdr,
                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
-        if (rc < 0) {
+        if (rc < 0)
                 GOTO(out_destroy, rc);
-        }
 
         loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
         cathandle->u.chd.chd_current_log = loghandle;
@@ -110,8 +117,14 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
 
  out_destroy:
-        if (rc < 0)
+        if (rc < 0) 
                 llog_destroy(loghandle);
+        else if (logcookie) {
+                if (llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
+                        LASSERT(EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl));
+                else
+                        llog_cookie_set_flags(logcookie, LLOG_COOKIE_REPLAY_NEW);
+        }
 
         RETURN(loghandle);
 }
@@ -147,7 +160,7 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
                 }
         }
 
-        rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
+        rc = llog_open(cathandle->lgh_ctxt, &loghandle, logid, NULL, 0);
         if (rc) {
                 CERROR("error opening log id "LPX64":%x: rc %d\n",
                        logid->lgl_oid, logid->lgl_ogen, rc);
@@ -199,7 +212,9 @@ EXPORT_SYMBOL(llog_cat_put);
  * NOTE: loghandle is write-locked upon successful return
  */
 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
-                                                int create)
+                                                int create,
+                                                struct llog_cookie *logcookie,
+                                                struct rw_semaphore **lock)
 {
         struct llog_handle *loghandle = NULL;
         ENTRY;
@@ -208,12 +223,22 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
         loghandle = cathandle->u.chd.chd_current_log;
         if (loghandle) {
                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
-                if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
-                        down_write(&loghandle->lgh_lock);
+                down_write(&loghandle->lgh_lock);
+                if (loghandle->lgh_last_idx < (LLOG_BITMAP_SIZE(llh) - 1) &&
+                    (!logcookie ||
+                     !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+                     EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
                         up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
+                } else {
+                        up_write(&loghandle->lgh_lock);
                 }
         }
+
+        LASSERT(!logcookie ||
+                !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+                llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW);
+
         if (!create) {
                 if (loghandle)
                         down_write(&loghandle->lgh_lock);
@@ -229,17 +254,26 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
         loghandle = cathandle->u.chd.chd_current_log;
         if (loghandle) {
                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
-                if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
-                        down_write(&loghandle->lgh_lock);
+                down_write(&loghandle->lgh_lock);
+                if (loghandle->lgh_last_idx < (LLOG_BITMAP_SIZE(llh) - 1) &&
+                    (!logcookie ||
+                     !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+                     EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
                         up_write(&cathandle->lgh_lock);
                         RETURN(loghandle);
+                } else {
+                        up_write(&loghandle->lgh_lock);
                 }
         }
 
         CDEBUG(D_INODE, "creating new log\n");
-        loghandle = llog_cat_new_log(cathandle);
-        if (!IS_ERR(loghandle))
+        loghandle = llog_cat_new_log(cathandle, logcookie);
+        if (!IS_ERR(loghandle)) {
                 down_write(&loghandle->lgh_lock);
+                if (lock != NULL)
+                        *lock = &loghandle->lgh_lock;
+        }
+
         up_write(&cathandle->lgh_lock);
         RETURN(loghandle);
 }
@@ -250,19 +284,25 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
  * Assumes caller has already pushed us into the kernel context.
  */
 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
-                     struct llog_cookie *reccookie, void *buf)
+                     struct llog_cookie *reccookie, void *buf,
+                     struct rw_semaphore **lock, int *lock_count)
 {
         struct llog_handle *loghandle;
         int rc;
         ENTRY;
 
         LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
-        loghandle = llog_cat_current_log(cathandle, 1);
+        loghandle = llog_cat_current_log(cathandle, 1, reccookie, lock);
         if (IS_ERR(loghandle))
                 RETURN(PTR_ERR(loghandle));
         /* loghandle is already locked by llog_cat_current_log() for us */
         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
-        up_write(&loghandle->lgh_lock);
+        if (!lock || *lock == NULL) {
+                up_write(&loghandle->lgh_lock);
+        } else {
+                LASSERT(lock_count != NULL);
+                *lock_count += 1;
+        }
 
         RETURN(rc);
 }
@@ -331,7 +371,7 @@ static int llog_cat_process_cb(struct llog_handle *cat_llh,
                 CERROR("invalid record in catalog\n");
                 RETURN(-EINVAL);
         }
-        CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+        CDEBUG(D_INFO, "processing log "LPX64":%x at index %u of catalog "LPX64"\n",
                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
 
@@ -449,7 +489,7 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
         int i, bitmap_size, idx;
         ENTRY;
 
-        bitmap_size = sizeof(llh->llh_bitmap) * 8;
+        bitmap_size = LLOG_BITMAP_SIZE(llh);
         if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
                 idx = le32_to_cpu(llh->llh_cat_idx) + 1;
                 llh->llh_cat_idx = cpu_to_le32(idx);
@@ -478,7 +518,8 @@ EXPORT_SYMBOL(llog_cat_set_first_idx);
 
 int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
                      void *buf, struct llog_cookie *logcookies, 
-                     int numcookies, void *data)
+                     int numcookies, void *data, 
+                     struct rw_semaphore **lock, int *lock_count)
 {
         struct llog_handle *cathandle;
         int rc;
@@ -487,7 +528,7 @@ int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
         cathandle = ctxt->loc_handle;
         LASSERT(cathandle != NULL);
         
-        rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
+        rc = llog_cat_add_rec(cathandle, rec, logcookies, buf, lock, lock_count);
         if (rc != 1)
                 CERROR("write one catalog record failed: %d\n", rc);
         RETURN(rc);
@@ -511,6 +552,7 @@ int llog_catalog_cancel(struct llog_ctxt *ctxt, int count,
 EXPORT_SYMBOL(llog_catalog_cancel);
 
 int llog_catalog_setup(struct llog_ctxt **res, char *name,
+                       struct obd_export *exp, 
                        struct lvfs_run_ctxt *lvfs_ctxt,
                        struct fsfilt_operations *fsops,
                        struct dentry *logs_de, 
@@ -529,8 +571,11 @@ int llog_catalog_setup(struct llog_ctxt **res, char *name,
 
         *res = ctxt;
 
+        /* marking this ctxt alone. */
+        ctxt->loc_alone = 1;
         ctxt->loc_fsops = fsops;
         ctxt->loc_lvfs_ctxt = lvfs_ctxt;
+        ctxt->loc_exp = exp;
         ctxt->loc_logs_dir = logs_de;
         ctxt->loc_objects_dir = objects_de;
         ctxt->loc_logops = &llog_lvfs_ops; 
@@ -544,9 +589,10 @@ int llog_catalog_setup(struct llog_ctxt **res, char *name,
                 RETURN(rc);
         }
         if (catid.lci_logid.lgl_oid)
-                rc = llog_create(ctxt, &handle, &catid.lci_logid, 0);
+                rc = llog_open(ctxt, &handle, &catid.lci_logid, NULL,
+                               OBD_LLOG_FL_CREATE);
         else {
-                rc = llog_create(ctxt, &handle, NULL, NULL);
+                rc = llog_open(ctxt, &handle, NULL, NULL, OBD_LLOG_FL_CREATE);
                 if (!rc)
                         catid.lci_logid = handle->lgh_id;
         }
@@ -570,43 +616,16 @@ EXPORT_SYMBOL(llog_catalog_setup);
 
 int llog_catalog_cleanup(struct llog_ctxt *ctxt)
 {
-        struct llog_handle *cathandle, *n, *loghandle;
-        struct llog_log_hdr *llh;
-        int rc, index;
+        struct llog_handle *cathandle;
         ENTRY;
-                                                                                                                             
+
         if (!ctxt)
                 return 0;
 
         cathandle = ctxt->loc_handle;
-        if (cathandle) {
-                list_for_each_entry_safe(loghandle, n,
-                                         &cathandle->u.chd.chd_head,
-                                         u.phd.phd_entry) {
-                        llh = loghandle->lgh_hdr;
-                        if ((le32_to_cpu(llh->llh_flags) &
-                                LLOG_F_ZAP_WHEN_EMPTY) &&
-                            (le32_to_cpu(llh->llh_count) == 1)) {
-                                rc = llog_destroy(loghandle);
-                                if (rc)
-                                        CERROR("failure destroying log during "
-                                               "cleanup: %d\n", rc);
-                                LASSERT(rc == 0);
-                                index = loghandle->u.phd.phd_cookie.lgc_index;
-                                llog_free_handle(loghandle);
-                                                                                                                             
-                                LASSERT(index);
-                                llog_cat_set_first_idx(cathandle, index);
-                                rc = llog_cancel_rec(cathandle, index);
-                                if (rc == 0)
-                                        CDEBUG(D_HA, "cancel plain log at index"
-                                               " %u of catalog "LPX64"\n",
-                                               index,cathandle->lgh_id.lgl_oid);
-                        }
-                }
+        if (cathandle)
                 llog_cat_put(ctxt->loc_handle);
-        }
-        OBD_FREE(ctxt, sizeof(*ctxt));
+
         return 0;
 }
 EXPORT_SYMBOL(llog_catalog_cleanup);
@@ -616,7 +635,7 @@ int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
         struct llog_handle *loghandle;
         struct llog_logid *lgl = &cookie->lgc_lgl;
         int rc;
-                                                                                                                             
+
         down_read(&handle->lgh_lock);
         rc = llog_cat_id2handle(handle, &loghandle, lgl);
         if (rc)