struct llog_rec_hdr *llh_hdr;
struct dt_object *o;
struct llog_thread_info *lgi;
+ enum llog_flag flags;
int rc;
ENTRY;
RETURN(LLOG_EEMPTY);
}
+ flags = handle->lgh_hdr->llh_flags;
+
lgi->lgi_off = 0;
lgi->lgi_buf.lb_buf = handle->lgh_hdr;
lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE;
RETURN(-EIO);
}
+ handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
RETURN(0);
}
/**
+ * Remove optional fields that the client doesn't expect.
+ * This is typically in order to ensure compatibility with older clients.
+ * It is assumed that since we exclusively remove fields, the block will be
+ * big enough to handle the remapped records. It is also assumed that records
+ * of a block have the same format (i.e.: the same features enabled).
+ *
+ * \param[in,out] hdr Header of the block of records to remap.
+ * \param[in,out] last_hdr Last header, don't read past this point.
+ * \param[in] flags Flags describing the fields to keep.
+ */
+static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
+ struct llog_rec_hdr *last_hdr,
+ enum changelog_rec_flags flags)
+{
+ if (hdr->lrh_type != CHANGELOG_REC)
+ return;
+
+ do {
+ struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
+
+ changelog_remap_rec(rec, rec->cr_flags & flags);
+ hdr = llog_rec_hdr_next(hdr);
+ } while ((char *)hdr <= (char *)last_hdr);
+}
+
+/**
* Implementation of the llog_operations::lop_next_block
*
* This function finds the the next llog block to return which contains
sizeof(struct llog_rec_tail));
/* get the last record in block */
last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
- le32_to_cpu(tail->lrt_len));
+ tail->lrt_len);
if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
lustre_swab_llog_rec(last_rec);
rec->lrh_index, next_idx);
GOTO(out, rc = -ENOENT);
}
+
+ /* Trim unsupported extensions for compat w/ older clients */
+ if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
+ changelog_block_trim_ext(rec, last_rec,
+ CLF_VERSION | CLF_RENAME);
+
GOTO(out, rc = 0);
}
GOTO(out, rc = -EIO);
rec->lrh_index, prev_idx);
GOTO(out, rc = -ENOENT);
}
+
+ /* Trim unsupported extensions for compat w/ older clients */
+ if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
+ changelog_block_trim_ext(rec, last_rec,
+ CLF_VERSION | CLF_RENAME);
+
GOTO(out, rc = 0);
}
GOTO(out, rc = -EIO);
* \retval dt_object of llog directory
* \retval ERR_PTR of negative value on error
*/
-struct dt_object *llog_osd_dir_get(const struct lu_env *env,
- struct llog_ctxt *ctxt)
+static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
+ struct llog_ctxt *ctxt)
{
struct dt_device *dt;
struct dt_thread_info *dti = dt_info(env);
struct llog_handle *res, struct thandle *th)
{
struct llog_thread_info *lgi = llog_info(env);
+ struct dt_insert_rec *rec = &lgi->lgi_dt_rec;
struct local_oid_storage *los;
struct dt_object *o;
int rc;
if (IS_ERR(llog_dir))
RETURN(PTR_ERR(llog_dir));
logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
+ rec->rec_fid = &lgi->lgi_fid;
+ rec->rec_type = S_IFREG;
rc = dt_declare_insert(env, llog_dir,
- (struct dt_rec *)&lgi->lgi_fid,
+ (struct dt_rec *)rec,
(struct dt_key *)res->lgh_name, th);
lu_object_put(env, &llog_dir->do_lu);
if (rc)
struct thandle *th)
{
struct llog_thread_info *lgi = llog_info(env);
+ struct dt_insert_rec *rec = &lgi->lgi_dt_rec;
struct local_oid_storage *los;
struct dt_object *o;
int rc = 0;
RETURN(PTR_ERR(llog_dir));
logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
+ rec->rec_fid = &lgi->lgi_fid;
+ rec->rec_type = S_IFREG;
dt_read_lock(env, llog_dir, 0);
- rc = dt_insert(env, llog_dir,
- (struct dt_rec *)&lgi->lgi_fid,
+ rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
(struct dt_key *)res->lgh_name,
th, BYPASS_CAPA, 1);
dt_read_unlock(env, llog_dir);
GOTO(out_trans, rc);
}
- dt_declare_ref_del(env, o, th);
+ rc = dt_declare_ref_del(env, o, th);
+ if (rc < 0)
+ GOTO(out_trans, rc);
rc = dt_declare_destroy(env, o, th);
if (rc)
lgi->lgi_buf.lb_buf = idarray;
lgi->lgi_buf.lb_len = size;
rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
- if (rc) {
+ /* -EFAULT means the llog is a sparse file. This is not an error
+ * after arbitrary OST index is supported. */
+ if (rc < 0 && rc != -EFAULT) {
CERROR("%s: error reading CATALOGS: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
GOTO(out, rc);