struct llog_handle *loghandle;
struct llog_log_hdr *llh;
struct llog_logid_rec rec;
- int rc, index, bitmap_size, i;
+ int rc, index, bitmap_size;
ENTRY;
+ llh = cathandle->lgh_hdr;
+ bitmap_size = sizeof(llh->llh_bitmap) * 8;
+
+ index = (cathandle->lgh_last_idx + 1) % bitmap_size;
+
+ /* maximum number of available slots in catlog is bitmap_size - 2 */
+ if (llh->llh_cat_idx == cpu_to_le32(index)) {
+ CERROR("no free catalog slots for log...\n");
+ RETURN(ERR_PTR(-ENOSPC));
+ } else {
+ if (index == 0)
+ index = 1;
+ if (ext2_set_bit(index, llh->llh_bitmap)) {
+ CERROR("argh, index %u already set in log bitmap?\n",
+ index);
+ LBUG(); /* should never happen */
+ }
+ cathandle->lgh_last_idx = index;
+ llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
+ llh->llh_tail.lrt_index = cpu_to_le32(index);
+ }
+
rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
if (rc)
RETURN(ERR_PTR(rc));
- rc = llog_init_handle(loghandle,
- LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
+ rc = llog_init_handle(loghandle,
+ LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
&cathandle->lgh_hdr->llh_tgtuuid);
if (rc)
GOTO(out_destroy, rc);
- /* Find first free entry */
- llh = cathandle->lgh_hdr;
- bitmap_size = sizeof(llh->llh_bitmap) * 8;
- for (i = 0, index = le32_to_cpu(llh->llh_count); i < bitmap_size;
- i++, index++) {
- index %= bitmap_size;
- if (ext2_set_bit(index, llh->llh_bitmap)) {
- /* XXX This should trigger log clean up or similar */
- CERROR("catalog index %d is still in use\n", index);
- } else {
- cathandle->lgh_last_idx = index;
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
- break;
- }
- }
- if (i == bitmap_size) {
- CERROR("no free catalog slots for log...\n");
- GOTO(out_destroy, rc = -ENOSPC);
- }
CWARN("new recovery log "LPX64":%x for index %u of catalog "LPX64"\n",
loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, index,
cathandle->lgh_id.lgl_oid);
rec.lid_tail.lrt_index = cpu_to_le32(index);
/* update the catalog: header and record */
- rc = llog_write_rec(cathandle, &rec.lid_hdr,
+ rc = llog_write_rec(cathandle, &rec.lid_hdr,
&loghandle->u.phd.phd_cookie, 1, NULL, index);
if (rc < 0) {
GOTO(out_destroy, rc);
}
EXPORT_SYMBOL(llog_cat_new_log);
-/* Assumes caller has already pushed us into the kernel context and is locking.
+/* Open an existent log handle and add it to the open list.
+ * This log handle will be closed when all of the records in it are removed.
+ *
+ * Assumes caller has already pushed us into the kernel context and is locking.
* We return a lock on the handle to ensure nobody yanks it from us.
*/
int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
if (cathandle == NULL)
RETURN(-EBADF);
- list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
+ list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
u.phd.phd_entry) {
struct llog_logid *cgl = &loghandle->lgh_id;
if (cgl->lgl_oid == logid->lgl_oid) {
continue;
}
loghandle->u.phd.phd_cat_handle = cathandle;
- cathandle->u.chd.chd_current_log = loghandle;
GOTO(out, rc = 0);
}
}
} else {
rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
if (!rc) {
- list_add(&loghandle->u.phd.phd_entry,
+ list_add(&loghandle->u.phd.phd_entry,
&cathandle->u.chd.chd_head);
- cathandle->u.chd.chd_current_log = loghandle;
}
}
if (!rc) {
loghandle->u.phd.phd_cat_handle = cathandle;
loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
- loghandle->u.phd.phd_cookie.lgc_index =
+ loghandle->u.phd.phd_cookie.lgc_index =
le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
}
int rc;
ENTRY;
- list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
+ list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
u.phd.phd_entry) {
int err = llog_close(loghandle);
if (err)
*
* NOTE: loghandle is write-locked upon successful return
*/
-static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
+static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
int create)
{
struct llog_handle *loghandle = NULL;
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
+ if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
down_write(&loghandle->lgh_lock);
up_read(&cathandle->lgh_lock);
RETURN(loghandle);
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
+ if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
down_write(&loghandle->lgh_lock);
up_write(&cathandle->lgh_lock);
RETURN(loghandle);
* Assumes caller has already pushed us into the kernel context.
*/
int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
- struct llog_cookie *reccookie, void *buf)
+ struct llog_cookie *reccookie, void *buf)
{
struct llog_handle *loghandle;
int rc;
/* loghandle is already locked by llog_cat_current_log() for us */
rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
up_write(&loghandle->lgh_lock);
+
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_add_rec);
down_write(&loghandle->lgh_lock);
rc = llog_cancel_rec(loghandle, cookies->lgc_index);
up_write(&loghandle->lgh_lock);
-
+
if (rc == 1) { /* log has been destroyed */
index = loghandle->u.phd.phd_cookie.lgc_index;
if (cathandle->u.chd.chd_current_log == loghandle)
cathandle->u.chd.chd_current_log = NULL;
llog_free_handle(loghandle);
-
+
LASSERT(index);
+ llog_cat_set_first_idx(cathandle, index);
rc = llog_cancel_rec(cathandle, index);
+ if (rc == 0)
+ CDEBUG(D_HA, "cancel plain log at index %u "
+ "of catalog "LPX64"\n",
+ index, cathandle->lgh_id.lgl_oid);
}
}
up_write(&cathandle->lgh_lock);
}
EXPORT_SYMBOL(llog_cat_cancel_records);
-int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data)
+int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
+ void *data)
{
struct llog_process_data *d = data;
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
- CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+ CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
if (rc) {
- CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
+ CERROR("Cannot find handle for log "LPX64"\n",
+ lir->lid_id.lgl_oid);
RETURN(rc);
- }
+ }
- rc = llog_process(llh, d->lpd_cb, d->lpd_data);
+ rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
RETURN(rc);
}
int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
{
struct llog_process_data d;
+ struct llog_process_cat_data cd;
+ struct llog_log_hdr *llh = cat_llh->lgh_hdr;
int rc;
ENTRY;
+
+ LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
d.lpd_data = data;
d.lpd_cb = cb;
- rc = llog_process(cat_llh, llog_cat_process_cb, &d);
+ if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+ CWARN("catlog "LPX64" crosses index zero\n",
+ cat_llh->lgh_id.lgl_oid);
+
+ cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+ cd.last_idx = 0;
+ rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
+ if (rc != 0)
+ RETURN(rc);
+
+ cd.first_idx = 0;
+ cd.last_idx = cat_llh->lgh_last_idx;
+ rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
+ } else {
+ rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
+ }
+
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_process);
+int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
+{
+ struct llog_log_hdr *llh = cathandle->lgh_hdr;
+ int i, bitmap_size, idx;
+ ENTRY;
+
+ bitmap_size = sizeof(llh->llh_bitmap) * 8;
+ if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
+ idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+ llh->llh_cat_idx = cpu_to_le32(idx);
+ if (idx == cathandle->lgh_last_idx)
+ goto out;
+ for (i = (index + 1) % bitmap_size;
+ i != cathandle->lgh_last_idx;
+ i = (i + 1) % bitmap_size) {
+ if (!ext2_test_bit(i, llh->llh_bitmap)) {
+ idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+ llh->llh_cat_idx = cpu_to_le32(idx);
+ } else if (i == 0) {
+ llh->llh_cat_idx = 0;
+ } else {
+ break;
+ }
+ }
+out:
+ CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
+ cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
+ }
+
+ RETURN(0);
+}
#if 0
/* Assumes caller has already pushed us into the kernel context. */
if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
-write_hdr:
+write_hdr:
rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
&offset);
if (rc != LLOG_CHUNK_SIZE) {