Remove lgh_write_offset for remote llog object,
since it is not easy to be refreshed when remote
transaction fails, which might cause llog file
corruption. Instead let's rely on the osp object
cache to invalidate and update the llog object
size.
Signed-off-by: Di Wang <di.wang@intel.com>
Change-Id: I38aa9ce2c5ac6485bf0cf6be023b8edde986b5d3
Reviewed-on: https://review.whamcloud.com/24008
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
int lgh_last_idx;
int lgh_cur_idx; /* used during llog_process */
__u64 lgh_cur_offset; /* used during llog_process */
int lgh_last_idx;
int lgh_cur_idx; /* used during llog_process */
__u64 lgh_cur_offset; /* used during llog_process */
- /* used during llog_osd_write_rec */
- __u64 lgh_write_offset;
- int lgh_max_size;
struct llog_ctxt *lgh_ctxt;
union {
struct plain_handle_data phd;
struct llog_ctxt *lgh_ctxt;
union {
struct plain_handle_data phd;
struct llog_operations *lgh_logops;
atomic_t lgh_refcount;
struct llog_operations *lgh_logops;
atomic_t lgh_refcount;
/* lrh_len should be initialized in llog_init_handle */
handle->lgh_last_idx = 0; /* header is record with index 0 */
/* lrh_len should be initialized in llog_init_handle */
handle->lgh_last_idx = 0; /* header is record with index 0 */
- handle->lgh_write_offset = 0;
llh->llh_count = 1; /* for the header record */
llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
LASSERT(handle->lgh_ctxt->loc_chunk_size >=
llh->llh_count = 1; /* for the header record */
llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
LASSERT(handle->lgh_ctxt->loc_chunk_size >=
handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
- handle->lgh_write_offset = lgi->lgi_attr.la_size;
__u32 chunk_size;
size_t left;
__u32 orig_last_idx;
__u32 chunk_size;
size_t left;
__u32 orig_last_idx;
- __u64 orig_write_offset;
ENTRY;
llh = loghandle->lgh_hdr;
ENTRY;
llh = loghandle->lgh_hdr;
LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
orig_last_idx = loghandle->lgh_last_idx;
LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
orig_last_idx = loghandle->lgh_last_idx;
- orig_write_offset = loghandle->lgh_write_offset;
lgi->lgi_off = lgi->lgi_attr.la_size;
if (loghandle->lgh_max_size > 0 &&
lgi->lgi_off = lgi->lgi_attr.la_size;
if (loghandle->lgh_max_size > 0 &&
- if (dt_object_remote(o))
- loghandle->lgh_write_offset = lgi->lgi_off;
-
loghandle->lgh_last_idx++; /* for pad rec */
}
/* if it's the last idx in log file, then return -ENOSPC
loghandle->lgh_last_idx++; /* for pad rec */
}
/* if it's the last idx in log file, then return -ENOSPC
* records. This also allows to handle Catalog wrap around case */
if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
* records. This also allows to handle Catalog wrap around case */
if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
- } else if (dt_object_remote(o)) {
- lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
- lgi->lgi_off);
} else {
rc = dt_attr_get(env, o, &lgi->lgi_attr);
if (rc)
} else {
rc = dt_attr_get(env, o, &lgi->lgi_attr);
if (rc)
if (rc < 0)
GOTO(out, rc);
if (rc < 0)
GOTO(out, rc);
- if (dt_object_remote(o))
- loghandle->lgh_write_offset = lgi->lgi_off;
-
CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n",
PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
lgi->lgi_off);
CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n",
PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
lgi->lgi_off);
/* restore llog last_idx */
if (dt_object_remote(o)) {
loghandle->lgh_last_idx = orig_last_idx;
/* restore llog last_idx */
if (dt_object_remote(o)) {
loghandle->lgh_last_idx = orig_last_idx;
- loghandle->lgh_write_offset = orig_write_offset;
} else if (--loghandle->lgh_last_idx == 0 &&
(llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
/* catalog had just wrap-around case */
} else if (--loghandle->lgh_last_idx == 0 &&
(llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
/* catalog had just wrap-around case */
if (last_rec->lrh_index != tail->lrt_index) {
CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
if (last_rec->lrh_index != tail->lrt_index) {
CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
- "offset %llu last_rec idx %u tail idx %u\n",
+ "offset %llu last_rec idx %u tail idx %u"
+ "lrt len %u read_size %d\n",
o->do_lu.lo_dev->ld_obd->obd_name,
POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, *cur_offset,
o->do_lu.lo_dev->ld_obd->obd_name,
POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, *cur_offset,
- last_rec->lrh_index, tail->lrt_index);
+ last_rec->lrh_index, tail->lrt_index,
+ tail->lrt_len, rc);
GOTO(out, rc = -EINVAL);
}
GOTO(out, rc = -EINVAL);
}
struct osp_object *obj = dt2osp_obj(dt);
ENTRY;
struct osp_object *obj = dt2osp_obj(dt);
ENTRY;
+ CDEBUG(D_HA, "Invalidate osp_object "DFID"\n",
+ PFID(lu_object_fid(&dt->do_lu)));
osp_obj_invalidate_cache(obj);
spin_lock(&obj->opo_lock);
osp_obj_invalidate_cache(obj);
spin_lock(&obj->opo_lock);