#include "llog_internal.h"
+
+/**
+ * lockdep markers for nested struct llog_handle::lgh_lock locking.
+ */
+enum {
+ LLOGH_CAT,
+ LLOGH_LOG,
+};
+
/* Create a new log handle and add it to the open list.
* This log handle will be closed when all of the records in it are removed.
*
/*
* prepare current/next log for catalog.
*
- * if log is NULL, open it, and declare create, NB, if log is remote, create it
- * synchronously here, see comments below.
+ * if \a *ploghandle is NULL, open it, and declare create, NB, if \a
+ * *ploghandle is remote, create it synchronously here, see comments
+ * below.
+ *
+ * \a cathandle->lgh_lock is down_read-ed, it gets down_write-ed if \a
+ * *ploghandle has to be opened.
*/
static int llog_cat_prep_log(const struct lu_env *env,
struct llog_handle *cathandle,
struct llog_handle **ploghandle,
struct thandle *th)
{
- int rc = 0;
+ int rc;
+ int sem_upgraded;
+start:
+ rc = 0;
+ sem_upgraded = 0;
if (IS_ERR_OR_NULL(*ploghandle)) {
+ up_read(&cathandle->lgh_lock);
down_write(&cathandle->lgh_lock);
+ sem_upgraded = 1;
if (IS_ERR_OR_NULL(*ploghandle)) {
struct llog_handle *loghandle;
&cathandle->u.chd.chd_head);
}
}
- up_write(&cathandle->lgh_lock);
-
if (rc)
- return rc;
+ GOTO(out, rc);
}
- if (llog_exist(*ploghandle))
- return 0;
+ rc = llog_exist(*ploghandle);
+ if (rc < 0)
+ GOTO(out, rc);
+ if (rc)
+ GOTO(out, rc = 0);
if (dt_object_remote(cathandle->lgh_obj)) {
-create_remote_log:
- down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
if (!llog_exist(*ploghandle)) {
/* For remote operation, if we put the llog object
NULL);
if (rc == -ESTALE) {
up_write(&(*ploghandle)->lgh_lock);
- up_read(&cathandle->lgh_lock);
+ if (sem_upgraded)
+ up_write(&cathandle->lgh_lock);
+ else
+ up_read(&cathandle->lgh_lock);
rc = llog_cat_refresh(env, cathandle);
- if (!rc)
- goto create_remote_log;
-
- return rc;
+ down_read_nested(&cathandle->lgh_lock,
+ LLOGH_CAT);
+ if (rc)
+ return rc;
+ /* *ploghandle might become NULL, restart */
+ goto start;
}
}
up_write(&(*ploghandle)->lgh_lock);
- up_read(&cathandle->lgh_lock);
} else {
struct llog_thread_info *lgi = llog_info(env);
struct llog_logid_rec *lirec = &lgi->lgi_logid;
rc = llog_declare_create(env, *ploghandle, th);
if (rc)
- return rc;
+ GOTO(out, rc);
lirec->lid_hdr.lrh_len = sizeof(*lirec);
rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
th);
}
+out:
+ if (sem_upgraded) {
+ up_write(&cathandle->lgh_lock);
+ down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
+ if (rc == 0)
+ goto start;
+ }
return rc;
}
}
EXPORT_SYMBOL(llog_cat_close);
-/**
- * lockdep markers for nested struct llog_handle::lgh_lock locking.
- */
-enum {
- LLOGH_CAT,
- LLOGH_LOG
-};
-
/** Return the currently active log handle. If the current log handle doesn't
* have enough space left for the current record, start a new one.
*
up_write(&loghandle->lgh_lock);
/* nobody should be trying to use this llog */
down_write(&cathandle->lgh_lock);
- /* only reset current log if still room in catalog, to
- * avoid unnecessarily and racy creation of new and
- * partially initialized llog_handle
- */
- if ((cathandle->u.chd.chd_current_log == loghandle) &&
- rc != -ENOSPC)
+ if (cathandle->u.chd.chd_current_log == loghandle)
cathandle->u.chd.chd_current_log = NULL;
up_write(&cathandle->lgh_lock);
RETURN(rc);
ENTRY;
+start:
+ down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
rc = llog_cat_prep_log(env, cathandle,
&cathandle->u.chd.chd_current_log, th);
if (rc)
- RETURN(rc);
+ GOTO(unlock, rc);
rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
th);
if (rc)
- RETURN(rc);
+ GOTO(unlock, rc);
-declare_write:
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
rec, -1, th);
if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
+ up_read(&cathandle->lgh_lock);
rc = llog_cat_refresh(env, cathandle);
- if (!rc)
- goto declare_write;
+ if (rc)
+ RETURN(rc);
+ goto start;
}
#if 0
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
th);
#endif
-
+unlock:
+ up_read(&cathandle->lgh_lock);
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_declare_add_rec);
}
EXPORT_SYMBOL(llog_cat_add);
+int llog_cat_cancel_arr_rec(const struct lu_env *env,
+ struct llog_handle *cathandle,
+ struct llog_logid *lgl, int count, int *index)
+{
+ struct llog_handle *loghandle;
+ int rc;
+
+ ENTRY;
+ rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
+ if (rc) {
+ CDEBUG(D_HA, "%s: cannot find llog for handle "DFID":%x"
+ ": rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name,
+ PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
+ RETURN(rc);
+ }
+
+ if ((cathandle->lgh_ctxt->loc_flags &
+ LLOG_CTXT_FLAG_NORMAL_FID) && !llog_exist(loghandle)) {
+ /* For update log, some of loghandles of cathandle
+ * might not exist because remote llog creation might
+ * be failed, so let's skip the record cancellation
+ * for these non-exist llogs.
+ */
+ rc = -ENOENT;
+ CDEBUG(D_HA, "%s: llog "DFID":%x does not exist"
+ ": rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name,
+ PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
+
+ llog_handle_put(loghandle);
+ RETURN(rc);
+ }
+
+ rc = llog_cancel_arr_rec(env, loghandle, count, index);
+ if (rc == LLOG_DEL_PLAIN) { /* log has been destroyed */
+ int cat_index;
+
+ cat_index = loghandle->u.phd.phd_cookie.lgc_index;
+ rc = llog_cat_cleanup(env, cathandle, loghandle, cat_index);
+ if (rc)
+ CERROR("%s: fail to cancel catalog record: rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name, rc);
+ rc = 0;
+
+ }
+ llog_handle_put(loghandle);
+
+ if (rc)
+ CERROR("%s: fail to cancel %d llog-records: rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name, count,
+ rc);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_cancel_arr_rec);
+
/* For each cookie in the cookie array, we clear the log in-use bit and either:
* - the log is empty, so mark it free in the catalog header and delete it
* - the log is not empty, just write out the log header
struct llog_handle *cathandle, int count,
struct llog_cookie *cookies)
{
- int i, index, rc = 0, failed = 0;
+ int i, rc = 0, failed = 0;
ENTRY;
for (i = 0; i < count; i++, cookies++) {
- struct llog_handle *loghandle;
- struct llog_logid *lgl = &cookies->lgc_lgl;
- int lrc;
-
- rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
- if (rc) {
- CDEBUG(D_HA, "%s: cannot find llog for handle "DFID":%x"
- ": rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name,
- PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
- failed++;
- continue;
- }
-
- if ((cathandle->lgh_ctxt->loc_flags &
- LLOG_CTXT_FLAG_NORMAL_FID) && !llog_exist(loghandle)) {
- /* For update log, some of loghandles of cathandle
- * might not exist because remote llog creation might
- * be failed, so let's skip the record cancellation
- * for these non-exist llogs.
- */
- lrc = -ENOENT;
- CDEBUG(D_HA, "%s: llog "DFID":%x does not exist"
- ": rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name,
- PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, lrc);
- failed++;
- if (rc == 0)
- rc = lrc;
- continue;
- }
+ int lrc;
- lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
- if (lrc == LLOG_DEL_PLAIN) { /* log has been destroyed */
- index = loghandle->u.phd.phd_cookie.lgc_index;
- lrc = llog_cat_cleanup(env, cathandle, loghandle,
- index);
- if (rc == 0)
- rc = lrc;
- } else if (lrc == -ENOENT) {
- if (rc == 0) /* ENOENT shouldn't rewrite any error */
- rc = lrc;
- } else if (lrc < 0) {
+ lrc = llog_cat_cancel_arr_rec(env, cathandle, &cookies->lgc_lgl,
+ 1, &cookies->lgc_index);
+ if (lrc) {
failed++;
- if (rc == 0)
+ if (!rc)
rc = lrc;
}
- llog_handle_put(loghandle);
}
- if (rc)
+ if (failed)
CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
rc);
-
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_cancel_records);
llog_cb_t cb, void *data, int startcat,
int startidx, bool fork)
{
- struct llog_process_data d;
- struct llog_log_hdr *llh = cat_llh->lgh_hdr;
- int rc;
- ENTRY;
+ struct llog_process_data d;
+ struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+ int rc;
- LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
- d.lpd_data = data;
- d.lpd_cb = cb;
- d.lpd_startcat = startcat;
- d.lpd_startidx = startidx;
+ ENTRY;
+
+ LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
+ d.lpd_data = data;
+ d.lpd_cb = cb;
+ d.lpd_startcat = (startcat == LLOG_CAT_FIRST ? 0 : startcat);
+ d.lpd_startidx = startidx;
if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
llh->llh_count > 1) {
CWARN("%s: catlog "DFID" crosses index zero\n",
cat_llh->lgh_ctxt->loc_obd->obd_name,
PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
-
- cd.lpcd_first_idx = llh->llh_cat_idx;
- cd.lpcd_last_idx = 0;
- rc = llog_process_or_fork(env, cat_llh, cat_cb,
- &d, &cd, fork);
- if (rc != 0)
- RETURN(rc);
-
- cd.lpcd_first_idx = 0;
+ /*startcat = 0 is default value for general processing */
+ if ((startcat != LLOG_CAT_FIRST &&
+ startcat >= llh->llh_cat_idx) || !startcat) {
+ /* processing the catalog part at the end */
+ cd.lpcd_first_idx = (startcat ? startcat :
+ llh->llh_cat_idx);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS))
+ cd.lpcd_last_idx = cfs_fail_val;
+ else
+ cd.lpcd_last_idx = 0;
+ rc = llog_process_or_fork(env, cat_llh, cat_cb,
+ &d, &cd, fork);
+ /* Reset the startcat becasue it has already reached
+ * catalog bottom.
+ */
+ startcat = 0;
+ if (rc != 0)
+ RETURN(rc);
+ }
+ /* processing the catalog part at the begining */
+ cd.lpcd_first_idx = (startcat == LLOG_CAT_FIRST) ? 0 : startcat;
+ /* Note, the processing will stop at the lgh_last_idx value,
+ * and it could be increased during processing. So records
+ * between current lgh_last_idx and lgh_last_idx in future
+ * would left unprocessed.
+ */
cd.lpcd_last_idx = cat_llh->lgh_last_idx;
rc = llog_process_or_fork(env, cat_llh, cat_cb,
&d, &cd, fork);
- } else {
+ } else {
rc = llog_process_or_fork(env, cat_llh, cat_cb,
&d, NULL, fork);
- }
+ }
- RETURN(rc);
+ RETURN(rc);
}
+EXPORT_SYMBOL(llog_cat_process_or_fork);
int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
llog_cb_t cb, void *data, int startcat, int startidx)