*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/obdclass/llog_cat.c
*
if (cathandle->lgh_name == NULL) {
CWARN("%s: there are no more free slots in catalog "
DFID":%x\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(loghandle),
PFID(&cathandle->lgh_id.lgl_oi.oi_fid),
cathandle->lgh_id.lgl_ogen);
} else {
CWARN("%s: there are no more free slots in "
- "catalog %s\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ "catalog %s\n", loghandle2name(loghandle),
cathandle->lgh_name);
}
RETURN(-ENOSPC);
GOTO(out, rc = 0);
} else if (rc != 0) {
CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+ loghandle2name(loghandle), rc);
GOTO(out, rc);
}
if (freespace > (128 << 20))
loghandle->lgh_max_size = 128 << 20;
}
+ if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PLAIN_RECORDS) ||
+ OBD_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK))) {
+ // limit the numer of plain records for test
+ loghandle->lgh_max_size = loghandle->lgh_hdr_size +
+ cfs_fail_val * 64;
+ }
+
rc = 0;
out:
if (!llog_exist(loghandle))
continue;
+ down_write(&loghandle->lgh_lock);
rc = llog_read_header(env, loghandle, NULL);
+ up_write(&loghandle->lgh_lock);
if (rc)
goto unlock;
}
rc = llog_read_header(env, cathandle, NULL);
unlock:
- up_write(&loghandle->lgh_lock);
+ up_write(&cathandle->lgh_lock);
return rc;
}
/*
* prepare current/next log for catalog.
*
- * if log is NULL, open it, and declare create, NB, if log is remote, create it
- * synchronously here, see comments below.
+ * if \a *ploghandle is NULL, open it, and declare create, NB, if \a
+ * *ploghandle is remote, create it synchronously here, see comments
+ * below.
+ *
+ * \a cathandle->lgh_lock is down_read-ed, it gets down_write-ed if \a
+ * *ploghandle has to be opened.
*/
static int llog_cat_prep_log(const struct lu_env *env,
struct llog_handle *cathandle,
struct llog_handle **ploghandle,
struct thandle *th)
{
- int rc = 0;
+ int rc;
+ int sem_upgraded;
+start:
+ rc = 0;
+ sem_upgraded = 0;
if (IS_ERR_OR_NULL(*ploghandle)) {
+ up_read(&cathandle->lgh_lock);
down_write(&cathandle->lgh_lock);
+ sem_upgraded = 1;
if (IS_ERR_OR_NULL(*ploghandle)) {
struct llog_handle *loghandle;
&cathandle->u.chd.chd_head);
}
}
- up_write(&cathandle->lgh_lock);
-
if (rc)
- return rc;
+ GOTO(out, rc);
}
- if (llog_exist(*ploghandle))
- return 0;
+ rc = llog_exist(*ploghandle);
+ if (rc < 0)
+ GOTO(out, rc);
+ if (rc)
+ GOTO(out, rc = 0);
if (dt_object_remote(cathandle->lgh_obj)) {
-create_remote_log:
- down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
if (!llog_exist(*ploghandle)) {
/* For remote operation, if we put the llog object
NULL);
if (rc == -ESTALE) {
up_write(&(*ploghandle)->lgh_lock);
- up_read(&cathandle->lgh_lock);
+ if (sem_upgraded)
+ up_write(&cathandle->lgh_lock);
+ else
+ up_read(&cathandle->lgh_lock);
rc = llog_cat_refresh(env, cathandle);
- if (!rc)
- goto create_remote_log;
-
- return rc;
+ down_read_nested(&cathandle->lgh_lock,
+ LLOGH_CAT);
+ if (rc)
+ return rc;
+ /* *ploghandle might become NULL, restart */
+ goto start;
}
}
up_write(&(*ploghandle)->lgh_lock);
- up_read(&cathandle->lgh_lock);
} else {
struct llog_thread_info *lgi = llog_info(env);
struct llog_logid_rec *lirec = &lgi->lgi_logid;
rc = llog_declare_create(env, *ploghandle, th);
if (rc)
- return rc;
+ GOTO(out, rc);
lirec->lid_hdr.lrh_len = sizeof(*lirec);
rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
th);
}
+out:
+ if (sem_upgraded) {
+ up_write(&cathandle->lgh_lock);
+ down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
+ if (rc == 0)
+ goto start;
+ }
return rc;
}
ostid_seq(&cgl->lgl_oi) == ostid_seq(&logid->lgl_oi)) {
if (cgl->lgl_ogen != logid->lgl_ogen) {
CWARN("%s: log "DFID" generation %x != %x\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(loghandle),
PFID(&logid->lgl_oi.oi_fid),
cgl->lgl_ogen, logid->lgl_ogen);
continue;
}
+ *res = llog_handle_get(loghandle);
+ if (!*res) {
+ CERROR("%s: log "DFID" refcount is zero!\n",
+ loghandle2name(loghandle),
+ PFID(&logid->lgl_oi.oi_fid));
+ continue;
+ }
loghandle->u.phd.phd_cat_handle = cathandle;
up_write(&cathandle->lgh_lock);
- GOTO(out, rc = 0);
+ RETURN(rc);
}
}
up_write(&cathandle->lgh_lock);
LLOG_OPEN_EXISTS);
if (rc < 0) {
CERROR("%s: error opening log id "DFID":%x: rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name,
- PFID(&logid->lgl_oi.oi_fid), logid->lgl_ogen, rc);
+ loghandle2name(cathandle), PFID(&logid->lgl_oi.oi_fid),
+ logid->lgl_ogen, rc);
RETURN(rc);
}
- rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
+ rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN |
+ LLOG_F_ZAP_WHEN_EMPTY | fmt, NULL);
if (rc < 0) {
llog_close(env, loghandle);
- loghandle = NULL;
+ *res = NULL;
RETURN(rc);
}
+ *res = llog_handle_get(loghandle);
+ LASSERT(*res);
down_write(&cathandle->lgh_lock);
list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
up_write(&cathandle->lgh_lock);
loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
loghandle->u.phd.phd_cookie.lgc_index =
loghandle->lgh_hdr->llh_cat_idx;
- EXIT;
-out:
- llog_handle_get(loghandle);
- *res = loghandle;
- return 0;
+ RETURN(0);
}
int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
if (rc)
CERROR("%s: failure destroying log during "
"cleanup: rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
- rc);
+ loghandle2name(loghandle), rc);
index = loghandle->u.phd.phd_cookie.lgc_index;
llog_cat_cleanup(env, cathandle, NULL, index);
* meet this situation. */
if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) {
CERROR("%s: next log does not exist!\n",
- cathandle->lgh_ctxt->loc_obd->obd_name);
+ loghandle2name(cathandle));
loghandle = ERR_PTR(-EIO);
if (cathandle->u.chd.chd_next_log == NULL) {
/* Store the error in chd_next_log, so
up_write(&loghandle->lgh_lock);
/* nobody should be trying to use this llog */
down_write(&cathandle->lgh_lock);
- /* only reset current log if still room in catalog, to
- * avoid unnecessarily and racy creation of new and
- * partially initialized llog_handle
- */
- if ((cathandle->u.chd.chd_current_log == loghandle) &&
- rc != -ENOSPC)
+ if (cathandle->u.chd.chd_current_log == loghandle)
cathandle->u.chd.chd_current_log = NULL;
up_write(&cathandle->lgh_lock);
RETURN(rc);
if (retried++ == 0)
GOTO(retry, rc);
CERROR("%s: error on 2nd llog: rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name, rc);
+ loghandle2name(cathandle), rc);
}
RETURN(rc);
ENTRY;
+start:
+ down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
rc = llog_cat_prep_log(env, cathandle,
&cathandle->u.chd.chd_current_log, th);
if (rc)
- RETURN(rc);
+ GOTO(unlock, rc);
rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
th);
if (rc)
- RETURN(rc);
+ GOTO(unlock, rc);
-declare_write:
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
rec, -1, th);
if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
+ up_read(&cathandle->lgh_lock);
rc = llog_cat_refresh(env, cathandle);
- if (!rc)
- goto declare_write;
+ if (rc)
+ RETURN(rc);
+ goto start;
}
#if 0
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
th);
#endif
-
+unlock:
+ up_read(&cathandle->lgh_lock);
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_declare_add_rec);
rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
if (rc) {
CDEBUG(D_HA, "%s: cannot find llog for handle "DFID":%x"
- ": rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name,
+ ": rc = %d\n", loghandle2name(cathandle),
PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
RETURN(rc);
}
*/
rc = -ENOENT;
CDEBUG(D_HA, "%s: llog "DFID":%x does not exist"
- ": rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name,
+ ": rc = %d\n", loghandle2name(cathandle),
PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
- llog_handle_put(loghandle);
+ llog_handle_put(env, loghandle);
RETURN(rc);
}
rc = llog_cat_cleanup(env, cathandle, loghandle, cat_index);
if (rc)
CERROR("%s: fail to cancel catalog record: rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name, rc);
+ loghandle2name(cathandle), rc);
rc = 0;
}
- llog_handle_put(loghandle);
+ llog_handle_put(env, loghandle);
if (rc)
CERROR("%s: fail to cancel %d llog-records: rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name, count,
- rc);
+ loghandle2name(cathandle), count, rc);
RETURN(rc);
}
}
if (failed)
CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
- rc);
+ loghandle2name(cathandle), failed, count, rc);
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_cancel_records);
if (rec->lrh_type != le32_to_cpu(LLOG_LOGID_MAGIC)) {
rc = -EINVAL;
CWARN("%s: invalid record in catalog "DFID":%x: rc = %d\n",
- cat_llh->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(cat_llh),
PFID(&cat_llh->lgh_id.lgl_oi.oi_fid),
cat_llh->lgh_id.lgl_ogen, rc);
RETURN(rc);
rc = LLOG_DEL_RECORD;
else if (rc)
CWARN("%s: can't find llog handle "DFID":%x: rc = %d\n",
- cat_llh->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(cat_llh),
PFID(&lir->lid_id.lgl_oi.oi_fid),
lir->lid_id.lgl_ogen, rc);
hdr = (*llhp)->lgh_hdr;
if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
hdr->llh_count == 1 && cat_llh->lgh_obj != NULL &&
- *llhp != cat_llh->u.chd.chd_current_log) {
+ *llhp != cat_llh->u.chd.chd_current_log &&
+ *llhp != cat_llh->u.chd.chd_next_log) {
rc = llog_destroy(env, *llhp);
if (rc)
CWARN("%s: can't destroy empty log "DFID": rc = %d\n",
- (*llhp)->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name((*llhp)),
PFID(&lir->lid_id.lgl_oi.oi_fid), rc);
rc = LLOG_DEL_PLAIN;
}
} else if (d->lpd_startidx > 0) {
struct llog_process_cat_data cd;
+ cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
cd.lpcd_first_idx = d->lpd_startidx;
cd.lpcd_last_idx = 0;
rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
NULL, false);
}
+ if (rc == -ENOENT && (cat_llh->lgh_hdr->llh_flags & LLOG_F_RM_ON_ERR)) {
+ /*
+ * plain llog is reported corrupted, so better to just remove
+ * it if the caller is fine with that.
+ */
+ CERROR("%s: remove corrupted/missing llog "DFID"\n",
+ loghandle2name(cat_llh),
+ PFID(&llh->lgh_id.lgl_oi.oi_fid));
+ rc = LLOG_DEL_PLAIN;
+ }
out:
/* The empty plain log was destroyed while processing */
- if (rc == LLOG_DEL_PLAIN) {
- rc = llog_cat_cleanup(env, cat_llh, llh,
- llh->u.phd.phd_cookie.lgc_index);
- } else if (rc == LLOG_DEL_RECORD) {
+ if (rc == LLOG_DEL_PLAIN || rc == LLOG_DEL_RECORD)
/* clear wrong catalog entry */
- rc = llog_cat_cleanup(env, cat_llh, NULL, rec->lrh_index);
- }
+ rc = llog_cat_cleanup(env, cat_llh, llh, rec->lrh_index);
+ else if (rc == LLOG_SKIP_PLAIN)
+ /* processing callback ask to skip the llog -> continue */
+ rc = 0;
if (llh)
- llog_handle_put(llh);
+ llog_handle_put(env, llh);
RETURN(rc);
}
if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
llh->llh_count > 1) {
- struct llog_process_cat_data cd;
+ struct llog_process_cat_data cd = {
+ .lpcd_read_mode = LLOG_READ_MODE_NORMAL
+ };
CWARN("%s: catlog "DFID" crosses index zero\n",
- cat_llh->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(cat_llh),
PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
/*startcat = 0 is default value for general processing */
if ((startcat != LLOG_CAT_FIRST &&
* catalog bottom.
*/
startcat = 0;
+ d.lpd_startcat = 0;
if (rc != 0)
RETURN(rc);
}
}
if (llh != NULL)
- llog_handle_put(llh);
+ llog_handle_put(env, llh);
RETURN(0);
}
} else if (rc == LLOG_DEL_RECORD) {
/* clear wrong catalog entry */
rc = llog_cat_cleanup(env, cat_llh, NULL, rec->lrh_index);
+ } else if (rc == LLOG_SKIP_PLAIN) {
+ /* processing callback ask to skip the llog -> continue */
+ rc = 0;
}
if (rc)
RETURN(rc);
rc = llog_cat_cleanup(env, cat_llh, llh,
llh->u.phd.phd_cookie.lgc_index);
- llog_handle_put(llh);
+ llog_handle_put(env, llh);
RETURN(rc);
}
ENTRY;
LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
+ cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
d.lpd_data = data;
d.lpd_cb = cb;
if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
llh->llh_count > 1) {
CWARN("%s: catalog "DFID" crosses index zero\n",
- cat_llh->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(cat_llh),
PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
cd.lpcd_first_idx = 0;
while (idx != cathandle->lgh_last_idx) {
idx = (idx + 1) % bitmap_size;
- if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
+ if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) {
/* update llh_cat_idx for each unset bit,
* expecting the next one is set */
llh->llh_cat_idx = idx;
}
}
- CDEBUG(D_RPCTRACE, "catlog "DFID" first idx %u, last_idx %u\n",
+ CDEBUG(D_HA, "catlog "DFID" first idx %u, last_idx %u\n",
PFID(&cathandle->lgh_id.lgl_oi.oi_fid),
llh->llh_cat_idx, cathandle->lgh_last_idx);
}
struct llog_handle *loghandle, int index)
{
int rc;
+ struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
LASSERT(index);
if (loghandle != NULL) {
/* remove destroyed llog from catalog list and
* chd_current_log variable */
+ fid = loghandle->lgh_id.lgl_oi.oi_fid;
down_write(&cathandle->lgh_lock);
if (cathandle->u.chd.chd_current_log == loghandle)
cathandle->u.chd.chd_current_log = NULL;
list_del_init(&loghandle->u.phd.phd_entry);
up_write(&cathandle->lgh_lock);
- LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index);
+ LASSERT(index == loghandle->u.phd.phd_cookie.lgc_index ||
+ loghandle->u.phd.phd_cookie.lgc_index == 0);
/* llog was opened and keep in a list, close it now */
llog_close(env, loghandle);
}
llog_cat_set_first_idx(cathandle, index);
rc = llog_cancel_rec(env, cathandle, index);
if (rc == 0)
- CDEBUG(D_HA, "cancel plain log at index %u of catalog "DFID"\n",
- index, PFID(&cathandle->lgh_id.lgl_oi.oi_fid));
+ CDEBUG(D_HA,
+ "cancel plain log "DFID" at index %u of catalog "DFID"\n",
+ PFID(&fid), index,
+ PFID(&cathandle->lgh_id.lgl_oi.oi_fid));
return rc;
}