X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flvfs%2Fllog_cat.c;fp=lustre%2Fobdclass%2Fllog_cat.c;h=67a6f80c1ca14818acfc0071fe1f78fe56157a86;hb=cd9c585e8c7bdd6cfd802be64ef277dfd466be17;hp=d4fa3702acb385bcc8c7f1b7b471efb3202090c9;hpb=076069a9d05fb6228d45745cde26c2a87f3580a6;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog_cat.c b/lustre/lvfs/llog_cat.c similarity index 57% rename from lustre/obdclass/llog_cat.c rename to lustre/lvfs/llog_cat.c index d4fa370..67a6f80 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/lvfs/llog_cat.c @@ -38,7 +38,6 @@ #include #endif -#include #include #include @@ -51,7 +50,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) { struct llog_handle *loghandle; struct llog_log_hdr *llh; - struct llog_logid_rec rec = { { 0 }, }; + struct llog_logid_rec rec; int rc, index, bitmap_size; ENTRY; @@ -60,8 +59,8 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) index = (cathandle->lgh_last_idx + 1) % bitmap_size; - /* maximum number of available slots in catlog is bitmap_size - 2 */ - if (llh->llh_cat_idx == index) { + /* maximum number of available slots in catalog is bitmap_size - 2 */ + if (llh->llh_cat_idx == cpu_to_le32(index)) { CERROR("no free catalog slots for log...\n"); RETURN(ERR_PTR(-ENOSPC)); } else { @@ -73,8 +72,8 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) LBUG(); /* should never happen */ } cathandle->lgh_last_idx = index; - llh->llh_count++; - llh->llh_tail.lrt_index = index; + llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1); + llh->llh_tail.lrt_index = cpu_to_le32(index); } rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL); @@ -91,12 +90,12 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, index, cathandle->lgh_id.lgl_oid); /* build the record for this log in the catalog */ - rec.lid_hdr.lrh_len = sizeof(rec); - rec.lid_hdr.lrh_index = index; - rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC; + rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec)); + rec.lid_hdr.lrh_index = cpu_to_le32(index); + rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC); rec.lid_id = loghandle->lgh_id; - rec.lid_tail.lrt_len = sizeof(rec); - rec.lid_tail.lrt_index = index; + rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec)); + rec.lid_tail.lrt_index = cpu_to_le32(index); /* update the catalog: header and record */ rc = llog_write_rec(cathandle, &rec.lid_hdr, @@ -105,7 +104,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) GOTO(out_destroy, rc); } - loghandle->lgh_hdr->llh_cat_idx = index; + loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index); cathandle->u.chd.chd_current_log = loghandle; LASSERT(list_empty(&loghandle->u.phd.phd_entry)); list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); @@ -116,7 +115,6 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) RETURN(loghandle); } -EXPORT_SYMBOL(llog_cat_new_log); /* Open an existent log handle and add it to the open list. * This log handle will be closed when all of the records in it are removed. @@ -125,7 +123,7 @@ EXPORT_SYMBOL(llog_cat_new_log); * We return a lock on the handle to ensure nobody yanks it from us. */ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, - struct llog_logid *logid) + struct llog_logid *logid) { struct llog_handle *loghandle; int rc = 0; @@ -163,14 +161,15 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, if (!rc) { loghandle->u.phd.phd_cat_handle = cathandle; loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id; - loghandle->u.phd.phd_cookie.lgc_index = - loghandle->lgh_hdr->llh_cat_idx; + loghandle->u.phd.phd_cookie.lgc_index = + le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx); } out: *res = loghandle; RETURN(rc); } +EXPORT_SYMBOL(llog_cat_id2handle); int llog_cat_put(struct llog_handle *cathandle) { @@ -257,21 +256,13 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, int rc; ENTRY; - LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); + LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE); loghandle = llog_cat_current_log(cathandle, 1); if (IS_ERR(loghandle)) RETURN(PTR_ERR(loghandle)); /* loghandle is already locked by llog_cat_current_log() for us */ rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1); up_write(&loghandle->lgh_lock); - if (rc == -ENOSPC) { - /* to create a new plain log */ - loghandle = llog_cat_current_log(cathandle, 1); - if (IS_ERR(loghandle)) - RETURN(PTR_ERR(loghandle)); - rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1); - up_write(&loghandle->lgh_lock); - } RETURN(rc); } @@ -287,7 +278,7 @@ EXPORT_SYMBOL(llog_cat_add_rec); * Assumes caller has already pushed us into the kernel context. */ int llog_cat_cancel_records(struct llog_handle *cathandle, int count, - struct llog_cookie *cookies) + struct llog_cookie *cookies) { int i, index, rc = 0; ENTRY; @@ -328,21 +319,21 @@ int llog_cat_cancel_records(struct llog_handle *cathandle, int count, } EXPORT_SYMBOL(llog_cat_cancel_records); -int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, - void *data) +static int llog_cat_process_cb(struct llog_handle *cat_llh, + struct llog_rec_hdr *rec, void *data) { struct llog_process_data *d = data; struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; struct llog_handle *llh; int rc; - if (rec->lrh_type != LLOG_LOGID_MAGIC) { + if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) { CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, - rec->lrh_index, cat_llh->lgh_id.lgl_oid); + le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid); rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id); if (rc) { @@ -363,15 +354,15 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) int rc; ENTRY; - LASSERT(llh->llh_flags & LLOG_F_IS_CAT); + LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT)); d.lpd_data = data; d.lpd_cb = cb; if (llh->llh_cat_idx > cat_llh->lgh_last_idx) { - CWARN("catlog "LPX64" crosses index zero\n", + CWARN("catalog "LPX64" crosses index zero\n", cat_llh->lgh_id.lgl_oid); - cd.first_idx = llh->llh_cat_idx; + cd.first_idx = le32_to_cpu(llh->llh_cat_idx); cd.last_idx = 0; rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); if (rc != 0) @@ -388,6 +379,70 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) } EXPORT_SYMBOL(llog_cat_process); +static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, + struct llog_rec_hdr *rec, void *data) +{ + struct llog_process_data *d = data; + struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; + struct llog_handle *llh; + int rc; + + if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) { + CERROR("invalid record in catalog\n"); + RETURN(-EINVAL); + } + CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", + lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, + le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid); + + rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id); + if (rc) { + CERROR("Cannot find handle for log "LPX64"\n", + lir->lid_id.lgl_oid); + RETURN(rc); + } + + rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL); + RETURN(rc); +} + +int llog_cat_reverse_process(struct llog_handle *cat_llh, + llog_cb_t cb, void *data) +{ + struct llog_process_data d; + struct llog_process_cat_data cd; + struct llog_log_hdr *llh = cat_llh->lgh_hdr; + int rc; + ENTRY; + + LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT)); + d.lpd_data = data; + d.lpd_cb = cb; + + if (llh->llh_cat_idx > cat_llh->lgh_last_idx) { + CWARN("catalog "LPX64" crosses index zero\n", + cat_llh->lgh_id.lgl_oid); + + cd.first_idx = 0; + cd.last_idx = cat_llh->lgh_last_idx; + rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb, + &d, &cd); + if (rc != 0) + RETURN(rc); + + cd.first_idx = le32_to_cpu(llh->llh_cat_idx); + cd.last_idx = 0; + rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb, + &d, &cd); + } else { + rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb, + &d, NULL); + } + + RETURN(rc); +} +EXPORT_SYMBOL(llog_cat_reverse_process); + int llog_cat_set_first_idx(struct llog_handle *cathandle, int index) { struct llog_log_hdr *llh = cathandle->lgh_hdr; @@ -395,17 +450,17 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index) ENTRY; bitmap_size = sizeof(llh->llh_bitmap) * 8; - if (llh->llh_cat_idx == (index - 1)) { - idx = llh->llh_cat_idx + 1; - llh->llh_cat_idx = idx; + if (llh->llh_cat_idx == cpu_to_le32(index - 1)) { + idx = le32_to_cpu(llh->llh_cat_idx) + 1; + llh->llh_cat_idx = cpu_to_le32(idx); if (idx == cathandle->lgh_last_idx) goto out; for (i = (index + 1) % bitmap_size; i != cathandle->lgh_last_idx; i = (i + 1) % bitmap_size) { if (!ext2_test_bit(i, llh->llh_bitmap)) { - idx = llh->llh_cat_idx + 1; - llh->llh_cat_idx = idx; + idx = le32_to_cpu(llh->llh_cat_idx) + 1; + llh->llh_cat_idx = cpu_to_le32(idx); } else if (i == 0) { llh->llh_cat_idx = 0; } else { @@ -413,55 +468,166 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index) } } out: - CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n", - cathandle->lgh_id.lgl_oid, llh->llh_cat_idx); + CDEBUG(D_HA, "set catalog "LPX64" first idx %u\n", + cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx)); } RETURN(0); } +EXPORT_SYMBOL(llog_cat_set_first_idx); -#if 0 -/* Assumes caller has already pushed us into the kernel context. */ -int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid) +int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, + void *buf, struct llog_cookie *logcookies, + int numcookies, void *data) { - struct llog_log_hdr *llh; - loff_t offset = 0; - int rc = 0; + struct llog_handle *cathandle; + int rc; ENTRY; + + cathandle = ctxt->loc_handle; + LASSERT(cathandle != NULL); + + rc = llog_cat_add_rec(cathandle, rec, logcookies, buf); + if (rc != 1) + CERROR("write one catalog record failed: %d\n", rc); + RETURN(rc); +} +EXPORT_SYMBOL(llog_catalog_add); - LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE); +int llog_catalog_cancel(struct llog_ctxt *ctxt, int count, + struct llog_cookie *cookies, int flags, void *data) +{ + struct llog_handle *cathandle; + int rc; + ENTRY; - down(&cathandle->lgh_lock); - llh = cathandle->lgh_hdr; + if (cookies == NULL || count == 0) + RETURN(-EINVAL); + cathandle = ctxt->loc_handle; + LASSERT(cathandle != NULL); + rc = llog_cat_cancel_records(cathandle, count, cookies); + RETURN(rc); +} +EXPORT_SYMBOL(llog_catalog_cancel); - if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) { - llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0); - -write_hdr: - rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, - &offset); - if (rc != LLOG_CHUNK_SIZE) { - CERROR("error writing catalog header: rc %d\n", rc); - OBD_FREE(llh, sizeof(*llh)); - if (rc >= 0) - rc = -ENOSPC; - } else - rc = 0; - } else { - rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, - &offset); - if (rc != LLOG_CHUNK_SIZE) { - CERROR("error reading catalog header: rc %d\n", rc); - /* Can we do much else if the header is bad? */ - goto write_hdr; - } else - rc = 0; +int llog_catalog_setup(struct llog_ctxt **res, char *name, + struct lvfs_run_ctxt *lvfs_ctxt, + struct fsfilt_operations *fsops, + struct dentry *logs_de, + struct dentry *objects_de) +{ + struct llog_ctxt *ctxt; + struct llog_catid catid; + struct llog_handle *handle; + int rc; + + ENTRY; + + OBD_ALLOC(ctxt, sizeof(*ctxt)); + if (!ctxt) + RETURN(-ENOMEM); + + *res = ctxt; + + ctxt->loc_fsops = fsops; + ctxt->loc_lvfs_ctxt = lvfs_ctxt; + ctxt->loc_logs_dir = logs_de; + ctxt->loc_objects_dir = objects_de; + ctxt->loc_logops = &llog_lvfs_ops; + ctxt->loc_logops->lop_add = llog_catalog_add; + ctxt->loc_logops->lop_cancel = llog_catalog_cancel; + + memset(&catid, 0, sizeof(struct llog_catid)); + rc = llog_get_cat_list(lvfs_ctxt, fsops, name, 1, &catid); + if (rc) { + CERROR("error llog_get_cat_list rc: %d\n", rc); + RETURN(rc); + } + if (catid.lci_logid.lgl_oid) + rc = llog_create(ctxt, &handle, &catid.lci_logid, 0); + else { + rc = llog_create(ctxt, &handle, NULL, NULL); + if (!rc) + catid.lci_logid = handle->lgh_id; } + if (rc) + GOTO(out, rc); - cathandle->lgh_tgtuuid = &llh->llh_tgtuuid; - up(&cathandle->lgh_lock); + ctxt->loc_handle = handle; + rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL); + if (rc) + GOTO(out, rc); + + rc = llog_put_cat_list(lvfs_ctxt, fsops, name, 1, &catid); + if (rc) + CERROR("error llog_get_cat_list rc: %d\n", rc); +out: + if (ctxt && rc) + OBD_FREE(ctxt, sizeof(*ctxt)); RETURN(rc); } -EXPORT_SYMBOL(llog_cat_init); +EXPORT_SYMBOL(llog_catalog_setup); -#endif +int llog_catalog_cleanup(struct llog_ctxt *ctxt) +{ + struct llog_handle *cathandle, *n, *loghandle; + struct llog_log_hdr *llh; + int rc, index; + ENTRY; + + if (!ctxt) + return 0; + + cathandle = ctxt->loc_handle; + if (cathandle) { + list_for_each_entry_safe(loghandle, n, + &cathandle->u.chd.chd_head, + u.phd.phd_entry) { + llh = loghandle->lgh_hdr; + if ((le32_to_cpu(llh->llh_flags) & + LLOG_F_ZAP_WHEN_EMPTY) && + (le32_to_cpu(llh->llh_count) == 1)) { + rc = llog_destroy(loghandle); + if (rc) + CERROR("failure destroying log during " + "cleanup: %d\n", rc); + LASSERT(rc == 0); + index = loghandle->u.phd.phd_cookie.lgc_index; + llog_free_handle(loghandle); + + LASSERT(index); + llog_cat_set_first_idx(cathandle, index); + rc = llog_cancel_rec(cathandle, index); + if (rc == 0) + CDEBUG(D_HA, "cancel plain log at index" + " %u of catalog "LPX64"\n", + index,cathandle->lgh_id.lgl_oid); + } + } + llog_cat_put(ctxt->loc_handle); + } + OBD_FREE(ctxt, sizeof(*ctxt)); + return 0; +} +EXPORT_SYMBOL(llog_catalog_cleanup); + +int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle) +{ + struct llog_handle *loghandle; + struct llog_logid *lgl = &cookie->lgc_lgl; + int rc; + + down_read(&handle->lgh_lock); + rc = llog_cat_id2handle(handle, &loghandle, lgl); + if (rc) + GOTO(out, rc); + if (2 * loghandle->lgh_hdr->llh_cat_idx <= + handle->lgh_last_idx + handle->lgh_hdr->llh_cat_idx + 1) + rc = 1; + else + rc = 0; +out: + up_read(&handle->lgh_lock); + RETURN(rc); +} +EXPORT_SYMBOL(llog_cat_half_bottom);