Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / lvfs / llog_cat.c
similarity index 57%
rename from lustre/obdclass/llog_cat.c
rename to lustre/lvfs/llog_cat.c
index d4fa370..c05774f 100644 (file)
@@ -38,7 +38,6 @@
 #include <liblustre.h>
 #endif
 
-#include <linux/obd_class.h>
 #include <linux/lustre_log.h>
 #include <portals/list.h>
 
@@ -51,7 +50,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 {
         struct llog_handle *loghandle;
         struct llog_log_hdr *llh;
-        struct llog_logid_rec rec = { { 0 }, };
+        struct llog_logid_rec rec;
         int rc, index, bitmap_size;
         ENTRY;
 
@@ -60,8 +59,8 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 
         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
 
-        /* maximum number of available slots in catlog is bitmap_size - 2 */
-        if (llh->llh_cat_idx == index) {
+        /* maximum number of available slots in catalog is bitmap_size - 2 */
+        if (llh->llh_cat_idx == cpu_to_le32(index)) {
                 CERROR("no free catalog slots for log...\n");
                 RETURN(ERR_PTR(-ENOSPC));
         } else {
@@ -73,13 +72,15 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
                         LBUG(); /* should never happen */
                 }
                 cathandle->lgh_last_idx = index;
-                llh->llh_count++;
-                llh->llh_tail.lrt_index = index;
+                llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
+                llh->llh_tail.lrt_index = cpu_to_le32(index);
         }
 
         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
-        if (rc)
+        if (rc) {
+                CERROR("cannot create new log, error = %d\n", rc);
                 RETURN(ERR_PTR(rc));
+        }
 
         rc = llog_init_handle(loghandle,
                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
@@ -91,21 +92,20 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
                index, cathandle->lgh_id.lgl_oid);
         /* build the record for this log in the catalog */
-        rec.lid_hdr.lrh_len = sizeof(rec);
-        rec.lid_hdr.lrh_index = index;
-        rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
+        rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
+        rec.lid_hdr.lrh_index = cpu_to_le32(index);
+        rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
         rec.lid_id = loghandle->lgh_id;
-        rec.lid_tail.lrt_len = sizeof(rec);
-        rec.lid_tail.lrt_index = index;
+        rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
+        rec.lid_tail.lrt_index = cpu_to_le32(index);
 
         /* update the catalog: header and record */
         rc = llog_write_rec(cathandle, &rec.lid_hdr,
                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
-        if (rc < 0) {
+        if (rc < 0)
                 GOTO(out_destroy, rc);
-        }
 
-        loghandle->lgh_hdr->llh_cat_idx = index;
+        loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
         cathandle->u.chd.chd_current_log = loghandle;
         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
@@ -116,7 +116,6 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 
         RETURN(loghandle);
 }
-EXPORT_SYMBOL(llog_cat_new_log);
 
 /* Open an existent log handle and add it to the open list.
  * This log handle will be closed when all of the records in it are removed.
@@ -125,7 +124,7 @@ EXPORT_SYMBOL(llog_cat_new_log);
  * We return a lock on the handle to ensure nobody yanks it from us.
  */
 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
-                       struct llog_logid *logid)
+                              struct llog_logid *logid)
 {
         struct llog_handle *loghandle;
         int rc = 0;
@@ -163,14 +162,15 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
         if (!rc) {
                 loghandle->u.phd.phd_cat_handle = cathandle;
                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
-                loghandle->u.phd.phd_cookie.lgc_index = 
-                        loghandle->lgh_hdr->llh_cat_idx;
+                loghandle->u.phd.phd_cookie.lgc_index =
+                        le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
         }
 
 out:
         *res = loghandle;
         RETURN(rc);
 }
+EXPORT_SYMBOL(llog_cat_id2handle);
 
 int llog_cat_put(struct llog_handle *cathandle)
 {
@@ -257,21 +257,13 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
         int rc;
         ENTRY;
 
-        LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
+        LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
         loghandle = llog_cat_current_log(cathandle, 1);
         if (IS_ERR(loghandle))
                 RETURN(PTR_ERR(loghandle));
         /* loghandle is already locked by llog_cat_current_log() for us */
         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
         up_write(&loghandle->lgh_lock);
-        if (rc == -ENOSPC) {
-                /* to create a new plain log */
-                loghandle = llog_cat_current_log(cathandle, 1);
-                if (IS_ERR(loghandle))
-                        RETURN(PTR_ERR(loghandle));
-                rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
-                up_write(&loghandle->lgh_lock);
-        }
 
         RETURN(rc);
 }
@@ -287,7 +279,7 @@ EXPORT_SYMBOL(llog_cat_add_rec);
  * Assumes caller has already pushed us into the kernel context.
  */
 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
-                        struct llog_cookie *cookies)
+                            struct llog_cookie *cookies)
 {
         int i, index, rc = 0;
         ENTRY;
@@ -328,21 +320,21 @@ int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
 }
 EXPORT_SYMBOL(llog_cat_cancel_records);
 
-int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
-                        void *data)
+static int llog_cat_process_cb(struct llog_handle *cat_llh, 
+                               struct llog_rec_hdr *rec, void *data)
 {
         struct llog_process_data *d = data;
         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
         struct llog_handle *llh;
         int rc;
 
-        if (rec->lrh_type != LLOG_LOGID_MAGIC) {
+        if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
                 CERROR("invalid record in catalog\n");
                 RETURN(-EINVAL);
         }
         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
-               rec->lrh_index, cat_llh->lgh_id.lgl_oid);
+               le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
 
         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
         if (rc) {
@@ -363,15 +355,15 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
         int rc;
         ENTRY;
 
-        LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
+        LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
         d.lpd_data = data;
         d.lpd_cb = cb;
 
         if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
-                CWARN("catlog "LPX64" crosses index zero\n",
+                CWARN("catalog "LPX64" crosses index zero\n",
                       cat_llh->lgh_id.lgl_oid);
 
-                cd.first_idx = llh->llh_cat_idx;
+                cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
                 cd.last_idx = 0;
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
                 if (rc != 0)
@@ -388,6 +380,70 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
 }
 EXPORT_SYMBOL(llog_cat_process);
 
+static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, 
+                                       struct llog_rec_hdr *rec, void *data)
+{
+        struct llog_process_data *d = data;
+        struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+        struct llog_handle *llh;
+        int rc;
+
+        if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+                CERROR("invalid record in catalog\n");
+                RETURN(-EINVAL);
+        }
+        CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+               lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
+               le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
+
+        rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
+        if (rc) {
+                CERROR("Cannot find handle for log "LPX64"\n",
+                       lir->lid_id.lgl_oid);
+                RETURN(rc);
+        }
+
+        rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
+        RETURN(rc);
+}
+
+int llog_cat_reverse_process(struct llog_handle *cat_llh,
+                             llog_cb_t cb, void *data)
+{
+        struct llog_process_data d;
+        struct llog_process_cat_data cd;
+        struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+        int rc;
+        ENTRY;
+
+        LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
+        d.lpd_data = data;
+        d.lpd_cb = cb;
+
+        if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+                CWARN("catalog "LPX64" crosses index zero\n",
+                      cat_llh->lgh_id.lgl_oid);
+
+                cd.first_idx = 0;
+                cd.last_idx = cat_llh->lgh_last_idx;
+                rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+                                          &d, &cd);
+                if (rc != 0)
+                        RETURN(rc);
+
+                cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+                cd.last_idx = 0;
+                rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+                                          &d, &cd);
+        } else {
+                rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+                                          &d, NULL);
+        }
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_reverse_process);
+
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
 {
         struct llog_log_hdr *llh = cathandle->lgh_hdr;
@@ -395,17 +451,17 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
         ENTRY;
 
         bitmap_size = sizeof(llh->llh_bitmap) * 8;
-        if (llh->llh_cat_idx == (index - 1)) {
-                idx = llh->llh_cat_idx + 1;
-                llh->llh_cat_idx = idx;
+        if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
+                idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+                llh->llh_cat_idx = cpu_to_le32(idx);
                 if (idx == cathandle->lgh_last_idx)
                         goto out;
                 for (i = (index + 1) % bitmap_size;
                      i != cathandle->lgh_last_idx;
                      i = (i + 1) % bitmap_size) {
                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
-                                idx = llh->llh_cat_idx + 1;
-                                llh->llh_cat_idx = idx;
+                                idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+                                llh->llh_cat_idx = cpu_to_le32(idx);
                         } else if (i == 0) {
                                 llh->llh_cat_idx = 0;
                         } else {
@@ -413,55 +469,169 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
                         }
                 }
 out:
-                CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
-                       cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
+                CDEBUG(D_HA, "set catalog "LPX64" first idx %u\n",
+                       cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
         }
 
         RETURN(0);
 }
+EXPORT_SYMBOL(llog_cat_set_first_idx);
 
-#if 0
-/* Assumes caller has already pushed us into the kernel context. */
-int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
+int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
+                     void *buf, struct llog_cookie *logcookies, 
+                     int numcookies, void *data)
 {
-        struct llog_log_hdr *llh;
-        loff_t offset = 0;
-        int rc = 0;
+        struct llog_handle *cathandle;
+        int rc;
         ENTRY;
+        
+        cathandle = ctxt->loc_handle;
+        LASSERT(cathandle != NULL);
+        
+        rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
+        if (rc != 1)
+                CERROR("write one catalog record failed: %d\n", rc);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_catalog_add);
 
-        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
+int llog_catalog_cancel(struct llog_ctxt *ctxt, int count,
+                        struct llog_cookie *cookies, int flags, void *data)
+{
+        struct llog_handle *cathandle;
+        int rc;
+        ENTRY;
 
-        down(&cathandle->lgh_lock);
-        llh = cathandle->lgh_hdr;
+        if (cookies == NULL || count == 0)
+                RETURN(-EINVAL);
+        cathandle = ctxt->loc_handle;
+        LASSERT(cathandle != NULL);
+        rc = llog_cat_cancel_records(cathandle, count, cookies);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_catalog_cancel);
+
+int llog_catalog_setup(struct llog_ctxt **res, char *name,
+                       struct obd_export *exp, 
+                       struct lvfs_run_ctxt *lvfs_ctxt,
+                       struct fsfilt_operations *fsops,
+                       struct dentry *logs_de, 
+                       struct dentry *objects_de)
+{
+        struct llog_ctxt *ctxt;
+        struct llog_catid catid;
+        struct llog_handle *handle;
+        int rc;
+        
+        ENTRY;
 
-        if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
-                llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
-
-write_hdr:
-                rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
-                                   &offset);
-                if (rc != LLOG_CHUNK_SIZE) {
-                        CERROR("error writing catalog header: rc %d\n", rc);
-                        OBD_FREE(llh, sizeof(*llh));
-                        if (rc >= 0)
-                                rc = -ENOSPC;
-                } else
-                        rc = 0;
-        } else {
-                rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
-                                  &offset);
-                if (rc != LLOG_CHUNK_SIZE) {
-                        CERROR("error reading catalog header: rc %d\n", rc);
-                        /* Can we do much else if the header is bad? */
-                        goto write_hdr;
-                } else
-                        rc = 0;
+        OBD_ALLOC(ctxt, sizeof(*ctxt));
+        if (!ctxt)
+                RETURN(-ENOMEM);
+
+        *res = ctxt;
+
+        /* marking this ctxt alone. */
+        ctxt->loc_alone = 1;
+        ctxt->loc_fsops = fsops;
+        ctxt->loc_lvfs_ctxt = lvfs_ctxt;
+        ctxt->loc_exp = exp;
+        ctxt->loc_logs_dir = logs_de;
+        ctxt->loc_objects_dir = objects_de;
+        ctxt->loc_logops = &llog_lvfs_ops; 
+        ctxt->loc_logops->lop_add = llog_catalog_add;
+        ctxt->loc_logops->lop_cancel = llog_catalog_cancel;
+
+        memset(&catid, 0, sizeof(struct llog_catid));
+        rc = llog_get_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
+        if (rc) {
+                CERROR("error llog_get_cat_list rc: %d\n", rc);
+                RETURN(rc);
+        }
+        if (catid.lci_logid.lgl_oid)
+                rc = llog_create(ctxt, &handle, &catid.lci_logid, 0);
+        else {
+                rc = llog_create(ctxt, &handle, NULL, NULL);
+                if (!rc)
+                        catid.lci_logid = handle->lgh_id;
         }
+        if (rc)
+                GOTO(out, rc);
 
-        cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
-        up(&cathandle->lgh_lock);
+        ctxt->loc_handle = handle;
+        rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = llog_put_cat_list(lvfs_ctxt, fsops, name, 1, &catid);
+        if (rc)
+                CERROR("error llog_get_cat_list rc: %d\n", rc);
+out:
+        if (ctxt && rc)
+                OBD_FREE(ctxt, sizeof(*ctxt));
         RETURN(rc);
 }
-EXPORT_SYMBOL(llog_cat_init);
+EXPORT_SYMBOL(llog_catalog_setup);
 
-#endif
+int llog_catalog_cleanup(struct llog_ctxt *ctxt)
+{
+        struct llog_handle *cathandle, *n, *loghandle;
+        struct llog_log_hdr *llh;
+        int rc, index;
+        ENTRY;
+                                                                                                                             
+        if (!ctxt)
+                return 0;
+
+        cathandle = ctxt->loc_handle;
+        if (cathandle) {
+                list_for_each_entry_safe(loghandle, n,
+                                         &cathandle->u.chd.chd_head,
+                                         u.phd.phd_entry) {
+                        llh = loghandle->lgh_hdr;
+                        if ((le32_to_cpu(llh->llh_flags) &
+                                LLOG_F_ZAP_WHEN_EMPTY) &&
+                            (le32_to_cpu(llh->llh_count) == 1)) {
+                                rc = llog_destroy(loghandle);
+                                if (rc)
+                                        CERROR("failure destroying log during "
+                                               "cleanup: %d\n", rc);
+                                LASSERT(rc == 0);
+                                index = loghandle->u.phd.phd_cookie.lgc_index;
+                                llog_free_handle(loghandle);
+                                                                                                                             
+                                LASSERT(index);
+                                llog_cat_set_first_idx(cathandle, index);
+                                rc = llog_cancel_rec(cathandle, index);
+                                if (rc == 0)
+                                        CDEBUG(D_HA, "cancel plain log at index"
+                                               " %u of catalog "LPX64"\n",
+                                               index,cathandle->lgh_id.lgl_oid);
+                        }
+                }
+                llog_cat_put(ctxt->loc_handle);
+        }
+        return 0;
+}
+EXPORT_SYMBOL(llog_catalog_cleanup);
+
+int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
+{
+        struct llog_handle *loghandle;
+        struct llog_logid *lgl = &cookie->lgc_lgl;
+        int rc;
+                                                                                                                             
+        down_read(&handle->lgh_lock);
+        rc = llog_cat_id2handle(handle, &loghandle, lgl);
+        if (rc)
+                GOTO(out, rc);
+        if (2 * loghandle->lgh_hdr->llh_cat_idx <=
+            handle->lgh_last_idx + handle->lgh_hdr->llh_cat_idx + 1)
+                rc = 1;
+        else
+                rc = 0;
+out:
+        up_read(&handle->lgh_lock);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_half_bottom);