/*
* prepare current/next log for catalog.
*
- * if log is NULL, open it, and declare create, NB, if log is remote, create it
- * synchronously here, see comments below.
+ * if \a *ploghandle is NULL, open it, and declare create, NB, if \a
+ * *ploghandle is remote, create it synchronously here, see comments
+ * below.
+ *
+ * \a cathandle->lgh_lock is down_read-ed, it gets down_write-ed if \a
+ * *ploghandle has to be opened.
*/
static int llog_cat_prep_log(const struct lu_env *env,
struct llog_handle *cathandle,
struct llog_handle **ploghandle,
struct thandle *th)
{
- int rc = 0;
+ int rc;
+ int sem_upgraded;
+start:
+ rc = 0;
+ sem_upgraded = 0;
if (IS_ERR_OR_NULL(*ploghandle)) {
+ up_read(&cathandle->lgh_lock);
down_write(&cathandle->lgh_lock);
+ sem_upgraded = 1;
if (IS_ERR_OR_NULL(*ploghandle)) {
struct llog_handle *loghandle;
&cathandle->u.chd.chd_head);
}
}
- up_write(&cathandle->lgh_lock);
-
if (rc)
- return rc;
+ GOTO(out, rc);
}
- if (llog_exist(*ploghandle))
- return 0;
+ rc = llog_exist(*ploghandle);
+ if (rc < 0)
+ GOTO(out, rc);
+ if (rc)
+ GOTO(out, rc = 0);
if (dt_object_remote(cathandle->lgh_obj)) {
-create_remote_log:
- down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
if (!llog_exist(*ploghandle)) {
/* For remote operation, if we put the llog object
NULL);
if (rc == -ESTALE) {
up_write(&(*ploghandle)->lgh_lock);
- up_read(&cathandle->lgh_lock);
+ if (sem_upgraded)
+ up_write(&cathandle->lgh_lock);
+ else
+ up_read(&cathandle->lgh_lock);
rc = llog_cat_refresh(env, cathandle);
- if (!rc)
- goto create_remote_log;
-
- return rc;
+ down_read_nested(&cathandle->lgh_lock,
+ LLOGH_CAT);
+ if (rc)
+ return rc;
+ /* *ploghandle might become NULL, restart */
+ goto start;
}
}
up_write(&(*ploghandle)->lgh_lock);
- up_read(&cathandle->lgh_lock);
} else {
struct llog_thread_info *lgi = llog_info(env);
struct llog_logid_rec *lirec = &lgi->lgi_logid;
rc = llog_declare_create(env, *ploghandle, th);
if (rc)
- return rc;
+ GOTO(out, rc);
lirec->lid_hdr.lrh_len = sizeof(*lirec);
rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
th);
}
+out:
+ if (sem_upgraded) {
+ up_write(&cathandle->lgh_lock);
+ down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
+ if (rc == 0)
+ goto start;
+ }
return rc;
}
ENTRY;
+start:
+ down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
rc = llog_cat_prep_log(env, cathandle,
&cathandle->u.chd.chd_current_log, th);
if (rc)
- RETURN(rc);
+ GOTO(unlock, rc);
rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
th);
if (rc)
- RETURN(rc);
+ GOTO(unlock, rc);
-declare_write:
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
rec, -1, th);
if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
+ up_read(&cathandle->lgh_lock);
rc = llog_cat_refresh(env, cathandle);
- if (!rc)
- goto declare_write;
+ if (rc)
+ RETURN(rc);
+ goto start;
}
#if 0
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
th);
#endif
-
+unlock:
+ up_read(&cathandle->lgh_lock);
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_declare_add_rec);