GOTO(out, rc);
}
- rc = llog_init_handle(env, loghandle,
+ rc = llog_init_handle(env, loghandle, (cathandle->lgh_hdr->llh_flags &
+ LLOG_F_EXT_MASK) |
LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
&cathandle->lgh_hdr->llh_tgtuuid);
if (rc < 0)
struct llog_handle *loghandle;
int rc;
- down_write(&cathandle->lgh_lock);
+ LASSERT(rwsem_is_locked(&cathandle->lgh_lock));
+
list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
u.phd.phd_entry) {
if (!llog_exist(loghandle))
rc = llog_read_header(env, loghandle, NULL);
up_write(&loghandle->lgh_lock);
if (rc)
- goto unlock;
+ goto out;
}
rc = llog_read_header(env, cathandle, NULL);
-unlock:
- up_write(&cathandle->lgh_lock);
-
+out:
return rc;
}
+static inline int llog_cat_declare_create(const struct lu_env *env,
+ struct llog_handle *cathandle,
+ struct llog_handle *loghandle,
+ struct thandle *th)
+{
+
+ struct llog_thread_info *lgi = llog_info(env);
+ struct llog_logid_rec *lirec = &lgi->lgi_logid;
+ int rc;
+
+ rc = llog_declare_create(env, loghandle, th);
+ if (rc)
+ return rc;
+
+ lirec->lid_hdr.lrh_len = sizeof(*lirec);
+ rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
+ th);
+ if (!rc)
+ dt_declare_attr_set(env, cathandle->lgh_obj, NULL, th);
+
+ return rc;
+}
/*
* prepare current/next log for catalog.
*
struct llog_handle **ploghandle,
struct thandle *th)
{
+ struct llog_handle *loghandle;
int rc;
- int sem_upgraded;
start:
rc = 0;
- sem_upgraded = 0;
- if (IS_ERR_OR_NULL(*ploghandle)) {
- up_read(&cathandle->lgh_lock);
- down_write(&cathandle->lgh_lock);
- sem_upgraded = 1;
- if (IS_ERR_OR_NULL(*ploghandle)) {
- struct llog_handle *loghandle;
-
- rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
- NULL, NULL, LLOG_OPEN_NEW);
- if (!rc) {
- *ploghandle = loghandle;
- list_add_tail(&loghandle->u.phd.phd_entry,
- &cathandle->u.chd.chd_head);
- }
- }
- if (rc)
- GOTO(out, rc);
+ if (!IS_ERR_OR_NULL(*ploghandle)) {
+ if (llog_exist(*ploghandle) == 0)
+ return llog_cat_declare_create(env, cathandle,
+ *ploghandle, th);
+ return rc;
}
- rc = llog_exist(*ploghandle);
- if (rc < 0)
- GOTO(out, rc);
- if (rc)
- GOTO(out, rc = 0);
+ down_write(&cathandle->lgh_lock);
+ if (!IS_ERR_OR_NULL(*ploghandle)) {
+ up_write(&cathandle->lgh_lock);
+ if (llog_exist(*ploghandle) == 0)
+ return llog_cat_declare_create(env, cathandle,
+ *ploghandle, th);
+ return rc;
+ }
+
+ /* Slow path with open/create declare, only one thread do all stuff
+ * and share loghandle at the end
+ */
+ rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, NULL, NULL,
+ LLOG_OPEN_NEW);
+ if (rc) {
+ up_write(&cathandle->lgh_lock);
+ CDEBUG(D_OTHER, "%s: failed to open log, catalog "DFID" %d\n",
+ loghandle2name(cathandle), PLOGID(&cathandle->lgh_id),
+ rc);
+ return rc;
+ }
if (dt_object_remote(cathandle->lgh_obj)) {
- down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
- if (!llog_exist(*ploghandle)) {
- /* For remote operation, if we put the llog object
- * creation in the current transaction, then the
- * llog object will not be created on the remote
- * target until the transaction stop, if other
- * operations start before the transaction stop,
- * and use the same llog object, will be dependent
- * on the success of this transaction. So let's
- * create the llog object synchronously here to
- * remove the dependency. */
- rc = llog_cat_new_log(env, cathandle, *ploghandle,
- NULL);
- if (rc == -ESTALE) {
- up_write(&(*ploghandle)->lgh_lock);
- if (sem_upgraded)
- up_write(&cathandle->lgh_lock);
- else
- up_read(&cathandle->lgh_lock);
-
- rc = llog_cat_refresh(env, cathandle);
- down_read_nested(&cathandle->lgh_lock,
- LLOGH_CAT);
- if (rc)
- return rc;
- /* *ploghandle might become NULL, restart */
- goto start;
- }
- }
- up_write(&(*ploghandle)->lgh_lock);
+ /* For remote operation, if we put the llog object
+ * creation in the current transaction, then the
+ * llog object will not be created on the remote
+ * target until the transaction stop, if other
+ * operations start before the transaction stop,
+ * and use the same llog object, will be dependent
+ * on the success of this transaction. So let's
+ * create the llog object synchronously here to
+ * remove the dependency.
+ */
+ rc = llog_cat_new_log(env, cathandle, loghandle, NULL);
+ if (rc == -ESTALE) {
+ rc = llog_cat_refresh(env, cathandle);
+ if (rc)
+ GOTO(out, rc);
+ up_write(&cathandle->lgh_lock);
+ llog_close(env, loghandle);
+ goto start;
+ } else if (rc)
+ GOTO(out, rc);
} else {
- struct llog_thread_info *lgi = llog_info(env);
- struct llog_logid_rec *lirec = &lgi->lgi_logid;
-
- rc = llog_declare_create(env, *ploghandle, th);
+ rc = llog_cat_declare_create(env, cathandle, loghandle, th);
if (rc)
GOTO(out, rc);
-
- lirec->lid_hdr.lrh_len = sizeof(*lirec);
- rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
- th);
- dt_declare_attr_set(env, cathandle->lgh_obj, NULL, th);
}
+ list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+ *ploghandle = loghandle;
+
out:
- if (sem_upgraded) {
- up_write(&cathandle->lgh_lock);
- down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
- if (rc == 0)
- goto start;
- }
+ up_write(&cathandle->lgh_lock);
+ CDEBUG(D_OTHER, "%s: open log "DFID" for catalog "DFID" rc=%d\n",
+ loghandle2name(cathandle), PLOGID(&loghandle->lgh_id),
+ PLOGID(&cathandle->lgh_id), rc);
+
+ if (rc)
+ llog_close(env, loghandle);
+
return rc;
}
RETURN(-EBADF);
fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK;
- down_write(&cathandle->lgh_lock);
+ down_read(&cathandle->lgh_lock);
list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
u.phd.phd_entry) {
struct llog_logid *cgl = &loghandle->lgh_id;
continue;
}
loghandle->u.phd.phd_cat_handle = cathandle;
- up_write(&cathandle->lgh_lock);
+ up_read(&cathandle->lgh_lock);
RETURN(rc);
}
}
- up_write(&cathandle->lgh_lock);
+ up_read(&cathandle->lgh_lock);
rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
LLOG_OPEN_EXISTS);
*res = llog_handle_get(loghandle);
LASSERT(*res);
down_write(&cathandle->lgh_lock);
- list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+ list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
up_write(&cathandle->lgh_lock);
loghandle->u.phd.phd_cat_handle = cathandle;
static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
struct thandle *th)
{
- struct llog_handle *loghandle = NULL;
- ENTRY;
+ struct llog_handle *loghandle = NULL;
+ ENTRY;
if (CFS_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) {
- down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+ loghandle = cathandle->u.chd.chd_current_log;
GOTO(next, loghandle);
}
- down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
- loghandle = cathandle->u.chd.chd_current_log;
- if (loghandle) {
- struct llog_log_hdr *llh;
-
- down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
- llh = loghandle->lgh_hdr;
- if (llh == NULL || !llog_is_full(loghandle)) {
- up_read(&cathandle->lgh_lock);
- RETURN(loghandle);
- } else {
- up_write(&loghandle->lgh_lock);
- }
- }
- up_read(&cathandle->lgh_lock);
-
- /* time to use next log */
-
- /* first, we have to make sure the state hasn't changed */
- down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+retry:
loghandle = cathandle->u.chd.chd_current_log;
- if (loghandle) {
+ if (likely(loghandle)) {
struct llog_log_hdr *llh;
down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
llh = loghandle->lgh_hdr;
if (llh == NULL || !llog_is_full(loghandle))
- GOTO(out_unlock, loghandle);
+ RETURN(loghandle);
else
up_write(&loghandle->lgh_lock);
}
+ /* time to use next log */
next:
- /* Sigh, the chd_next_log and chd_current_log is initialized
- * in declare phase, and we do not serialize the catlog
- * accessing, so it might be possible the llog creation
- * thread (see llog_cat_declare_add_rec()) did not create
- * llog successfully, then the following thread might
- * meet this situation. */
- if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) {
- CERROR("%s: next log does not exist!\n",
- loghandle2name(cathandle));
- loghandle = ERR_PTR(-EIO);
- if (cathandle->u.chd.chd_next_log == NULL) {
- /* Store the error in chd_next_log, so
- * the following process can get correct
- * failure value */
- cathandle->u.chd.chd_next_log = loghandle;
+ /* first, we have to make sure the state hasn't changed */
+ down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+ if (unlikely(loghandle == cathandle->u.chd.chd_current_log)) {
+ struct llog_logid lid = {.lgl_oi.oi.oi_id = 0,
+ .lgl_oi.oi.oi_seq = 0,
+ .lgl_ogen = 0};
+ /* Sigh, the chd_next_log and chd_current_log is initialized
+ * in declare phase, and we do not serialize the catlog
+ * accessing, so it might be possible the llog creation
+ * thread (see llog_cat_declare_add_rec()) did not create
+ * llog successfully, then the following thread might
+ * meet this situation.
+ */
+ if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) {
+ CERROR("%s: next log does not exist, catalog "DFID" rc=%d\n",
+ loghandle2name(cathandle),
+ PLOGID(&cathandle->lgh_id), -EIO);
+ loghandle = ERR_PTR(-EIO);
+ if (cathandle->u.chd.chd_next_log == NULL) {
+ /* Store the error in chd_next_log, so
+ * the following process can get correct
+ * failure value
+ */
+ cathandle->u.chd.chd_next_log = loghandle;
+ }
+ GOTO(out_unlock, loghandle);
}
- GOTO(out_unlock, loghandle);
- }
-
- CDEBUG(D_INODE, "use next log\n");
+ if (!IS_ERR_OR_NULL(loghandle))
+ lid = loghandle->lgh_id;
- loghandle = cathandle->u.chd.chd_next_log;
- cathandle->u.chd.chd_current_log = loghandle;
- cathandle->u.chd.chd_next_log = NULL;
- down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+ CDEBUG(D_OTHER, "%s: use next log "DFID"->"DFID" catalog "DFID"\n",
+ loghandle2name(cathandle), PLOGID(&lid),
+ PLOGID(&cathandle->u.chd.chd_next_log->lgh_id),
+ PLOGID(&cathandle->lgh_id));
+ loghandle = cathandle->u.chd.chd_next_log;
+ cathandle->u.chd.chd_current_log = loghandle;
+ cathandle->u.chd.chd_next_log = NULL;
+ }
+ up_write(&cathandle->lgh_lock);
+ GOTO(retry, loghandle);
out_unlock:
up_write(&cathandle->lgh_lock);
rc = llog_cat_new_log(env, cathandle, loghandle, th);
if (rc < 0) {
up_write(&loghandle->lgh_lock);
- /* nobody should be trying to use this llog */
- down_write(&cathandle->lgh_lock);
- if (cathandle->u.chd.chd_current_log == loghandle)
- cathandle->u.chd.chd_current_log = NULL;
- up_write(&cathandle->lgh_lock);
+ /* When ENOSPC happened no need to drop loghandle
+ * a new one would be allocated anyway for next llog_add
+ * so better to stay with the old.
+ */
+ if (rc != -ENOSPC) {
+ /* nobody should be trying to use this llog */
+ down_write(&cathandle->lgh_lock);
+ if (cathandle->u.chd.chd_current_log ==
+ loghandle)
+ cathandle->u.chd.chd_current_log = NULL;
+ list_del_init(&loghandle->u.phd.phd_entry);
+ up_write(&cathandle->lgh_lock);
+ llog_close(env, loghandle);
+ }
RETURN(rc);
}
}
+
/* now let's try to add the record */
rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
if (rc < 0) {
ENTRY;
start:
- down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
+ CDEBUG(D_INFO, "Declare adding to "DOSTID" flags %x count %d\n",
+ POSTID(&cathandle->lgh_id.lgl_oi),
+ cathandle->lgh_hdr->llh_flags, cathandle->lgh_hdr->llh_count);
+
+
rc = llog_cat_prep_log(env, cathandle,
&cathandle->u.chd.chd_current_log, th);
if (rc)
- GOTO(unlock, rc);
+ RETURN(rc);
+ /* For local llog this would always reserves credits for creation */
rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
th);
if (rc)
- GOTO(unlock, rc);
+ RETURN(rc);
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
rec, -1, th);
if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
- up_read(&cathandle->lgh_lock);
+ down_write(&cathandle->lgh_lock);
rc = llog_cat_refresh(env, cathandle);
+ up_write(&cathandle->lgh_lock);
if (rc)
RETURN(rc);
goto start;
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
th);
#endif
-unlock:
- up_read(&cathandle->lgh_lock);
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_declare_add_rec);
if (loghandle->lgh_max_size > 0 &&
lgi->lgi_off >= loghandle->lgh_max_size) {
- CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u "
+ CDEBUG(D_OTHER, "llog is getting too large (%u >= %u) at %u "
DFID"\n", (unsigned)lgi->lgi_off,
loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
PLOGID(&loghandle->lgh_id));
if (rc < 0)
GOTO(out, rc);
+ if (loghandle->lgh_max_size > 0 &&
+ lgi->lgi_off >= loghandle->lgh_max_size) {
+ CDEBUG(D_OTHER, "llog is getting too large (%u >= %u) at %u "
+ DFID"\n", (unsigned int)lgi->lgi_off,
+ loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
+ PLOGID(&loghandle->lgh_id));
+ /* this is to signal that this llog is full */
+ loghandle->lgh_last_idx = llog_max_idx(llh);
+ }
+
up_write(&loghandle->lgh_last_sem);
CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
int next_idx, __u64 *cur_offset, void *buf,
int len)
{
- struct llog_thread_info *lgi = llog_info(env);
- struct dt_object *o;
- struct dt_device *dt;
- int rc;
- __u32 chunk_size;
+ struct llog_thread_info *lgi = llog_info(env);
+ struct dt_object *o;
+ struct dt_device *dt;
+ int rc;
+ __u32 chunk_size;
+ __u32 tail_len;
int last_idx = *cur_idx;
__u64 last_offset = *cur_offset;
bool force_mini_rec = !next_idx;
lustre_swab_llog_rec(rec);
tail = (struct llog_rec_tail *)((char *)buf + rc -
sizeof(struct llog_rec_tail));
+ tail_len = tail->lrt_len;
+ /* base on tail_len do swab */
+ if (tail_len > chunk_size) {
+ __swab32s(&tail_len);
+ if (tail_len > chunk_size) {
+ CERROR("%s: invalid llog tail at log id "DFID":%x offset %llu tail idx %u lrt len %u read_size %d\n",
+ o->do_lu.lo_dev->ld_obd->obd_name,
+ PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+ loghandle->lgh_id.lgl_ogen, *cur_offset,
+ tail->lrt_index, tail->lrt_len, rc);
+ /* tail is broken */
+ GOTO(out, rc = -EINVAL);
+ }
+ }
+ /* get the last record in block */
+ last_rec = (struct llog_rec_hdr *)((char *)tail - tail_len +
+ sizeof(struct llog_rec_tail));
/* caller handles bad records if any */
if (llog_verify_record(loghandle, rec))
GOTO(out, rc = 0);
- /* get the last record in block */
- last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
- tail->lrt_len);
-
if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
lustre_swab_llog_rec(last_rec);
struct llog_handle *loghandle,
int prev_idx, void *buf, int len)
{
- struct llog_thread_info *lgi = llog_info(env);
- struct dt_object *o;
- struct dt_device *dt;
- loff_t cur_offset;
- __u32 chunk_size;
- int rc;
-
- ENTRY;
-
- chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
- if (len == 0 || len & (chunk_size - 1))
- RETURN(-EINVAL);
-
- CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
-
- LASSERT(loghandle);
- LASSERT(loghandle->lgh_ctxt);
-
- o = loghandle->lgh_obj;
- LASSERT(o);
- dt_read_lock(env, o, 0);
- if (!llog_osd_exist(loghandle))
- GOTO(out, rc = -ESTALE);
-
- dt = lu2dt_dev(o->do_lu.lo_dev);
- LASSERT(dt);
-
- /* Let's only use mini record size for previous block read
- * for now XXX */
- cur_offset = chunk_size;
- llog_skip_over(loghandle, &cur_offset, 0, prev_idx,
- chunk_size, true);
-
- rc = dt_attr_get(env, o, &lgi->lgi_attr);
- if (rc)
- GOTO(out, rc);
-
- while (cur_offset < lgi->lgi_attr.la_size) {
- struct llog_rec_hdr *rec, *last_rec;
- struct llog_rec_tail *tail;
-
- lgi->lgi_buf.lb_len = len;
- lgi->lgi_buf.lb_buf = buf;
- rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
- if (rc < 0) {
- CERROR("%s: can't read llog block from log "DFID
- " offset %llu: rc = %d\n",
- o->do_lu.lo_dev->ld_obd->obd_name,
- PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
- GOTO(out, rc);
- }
-
- if (rc == 0) /* end of file, nothing to do */
- GOTO(out, rc);
-
- if (rc < sizeof(*tail)) {
- CERROR("%s: invalid llog block at log id "DFID" offset %llu\n",
- o->do_lu.lo_dev->ld_obd->obd_name,
- PLOGID(&loghandle->lgh_id), cur_offset);
- GOTO(out, rc = -EINVAL);
- }
-
- rec = buf;
- if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
- lustre_swab_llog_rec(rec);
-
- tail = (struct llog_rec_tail *)((char *)buf + rc -
- sizeof(struct llog_rec_tail));
- /* get the last record in block */
- last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
- le32_to_cpu(tail->lrt_len));
-
- if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
- lustre_swab_llog_rec(last_rec);
- LASSERT(last_rec->lrh_index == tail->lrt_index);
-
- /* this shouldn't happen */
- if (tail->lrt_index == 0) {
- CERROR("%s: invalid llog tail at log id "DFID" offset %llu\n",
- o->do_lu.lo_dev->ld_obd->obd_name,
- PLOGID(&loghandle->lgh_id), cur_offset);
- GOTO(out, rc = -EINVAL);
- }
- if (tail->lrt_index < prev_idx)
- continue;
-
- /* sanity check that the start of the new buffer is no farther
- * than the record that we wanted. This shouldn't happen. */
- if (rec->lrh_index > prev_idx) {
- CERROR("%s: missed desired record? %u > %u\n",
- o->do_lu.lo_dev->ld_obd->obd_name,
- rec->lrh_index, prev_idx);
- GOTO(out, rc = -ENOENT);
- }
+ loff_t cur_offset;
+ int cur_idx;
- /* Trim unsupported extensions for compat w/ older clients */
- changelog_block_trim_ext(rec, last_rec, loghandle);
+ cur_offset = loghandle->lgh_hdr->llh_hdr.lrh_len;
+ cur_idx = 1;
- GOTO(out, rc = 0);
- }
- GOTO(out, rc = -EIO);
-out:
- dt_read_unlock(env, o);
- return rc;
+ return llog_osd_next_block(env, loghandle, &cur_idx, prev_idx,
+ &cur_offset, buf, len);
}
/**
RETURN(rc);
out_put:
- dt_object_put(env, o);
+ if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
+ /* according to llog_osd_close() */
+ dt_object_put_nocache(env, o);
+ else
+ dt_object_put(env, o);
out_name:
OBD_FREE(handle->lgh_name, strlen(name) + 1);
out: