Llog handle keeps related dt_object pinned until llog_close()
call, meanwhile llog handle can still have other users which
took llog handle via llog_cat_id2handle()
Patch changes llog_handle_put() to call lop_close() upon last
reference drop. So llog_osd_close() will put dt_object only
when llog_handle has no more references.
The llog_handle_get() checks and reports if llog_handle has
zero reference.
Also patch modifies checks for destroyed llogs, llog handle
has new lgh_destroyed flag which is set when llog is destroyed,
llog_osd_exist() checks dt_object_exist() and lgh_destroyed
flag, so destroyed llogs are considered as non-existent too.
Previously it uses lu_object_is_dying() check which is not
reliable because means only that object is not to be kept in
cache.
Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Change-Id: If7df41646c243c0d40b20a30a33e86c688d24508
Reviewed-on: https://review.whamcloud.com/37367
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
char *lgh_name;
void *private_data;
struct llog_operations *lgh_logops;
char *lgh_name;
void *private_data;
struct llog_operations *lgh_logops;
+ refcount_t lgh_refcount;
mutex_init(&loghandle->lgh_hdr_mutex);
init_rwsem(&loghandle->lgh_last_sem);
INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
mutex_init(&loghandle->lgh_hdr_mutex);
init_rwsem(&loghandle->lgh_last_sem);
INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
- atomic_set(&loghandle->lgh_refcount, 1);
+ refcount_set(&loghandle->lgh_refcount, 1);
OBD_FREE_PTR(loghandle);
}
OBD_FREE_PTR(loghandle);
}
-void llog_handle_get(struct llog_handle *loghandle)
+struct llog_handle *llog_handle_get(struct llog_handle *loghandle)
- atomic_inc(&loghandle->lgh_refcount);
+ if (refcount_inc_not_zero(&loghandle->lgh_refcount))
+ return loghandle;
+ return NULL;
-void llog_handle_put(struct llog_handle *loghandle)
+int llog_handle_put(const struct lu_env *env, struct llog_handle *loghandle)
- LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
- if (atomic_dec_and_test(&loghandle->lgh_refcount))
+ int rc = 0;
+
+ if (refcount_dec_and_test(&loghandle->lgh_refcount)) {
+ struct llog_operations *lop;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (!rc) {
+ if (lop->lop_close)
+ rc = lop->lop_close(env, loghandle);
+ else
+ rc = -EOPNOTSUPP;
+ }
llog_free_handle(loghandle);
llog_free_handle(loghandle);
}
static int llog_declare_destroy(const struct lu_env *env,
}
static int llog_declare_destroy(const struct lu_env *env,
RETURN(-EOPNOTSUPP);
LASSERT(handle->lgh_obj != NULL);
RETURN(-EOPNOTSUPP);
LASSERT(handle->lgh_obj != NULL);
- if (!dt_object_exists(handle->lgh_obj))
+ if (!llog_exist(handle))
RETURN(0);
rc = lop->lop_destroy(env, handle, th);
RETURN(0);
rc = lop->lop_destroy(env, handle, th);
- if (!dt_object_exists(handle->lgh_obj))
+ if (!llog_exist(handle))
RETURN(0);
dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
RETURN(0);
dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
{
int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
{
- struct llog_operations *lop;
- int rc;
-
- ENTRY;
-
- rc = llog_handle2ops(loghandle, &lop);
- if (rc)
- GOTO(out, rc);
- if (lop->lop_close == NULL)
- GOTO(out, rc = -EOPNOTSUPP);
- rc = lop->lop_close(env, loghandle);
-out:
- llog_handle_put(loghandle);
- RETURN(rc);
+ return llog_handle_put(env, loghandle);
}
EXPORT_SYMBOL(llog_close);
}
EXPORT_SYMBOL(llog_close);
cgl->lgl_ogen, logid->lgl_ogen);
continue;
}
cgl->lgl_ogen, logid->lgl_ogen);
continue;
}
+ *res = llog_handle_get(loghandle);
+ if (!*res) {
+ CERROR("%s: log "DFID" refcount is zero!\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ PFID(&logid->lgl_oi.oi_fid));
+ continue;
+ }
loghandle->u.phd.phd_cat_handle = cathandle;
up_write(&cathandle->lgh_lock);
loghandle->u.phd.phd_cat_handle = cathandle;
up_write(&cathandle->lgh_lock);
}
}
up_write(&cathandle->lgh_lock);
}
}
up_write(&cathandle->lgh_lock);
rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
if (rc < 0) {
llog_close(env, loghandle);
rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
if (rc < 0) {
llog_close(env, loghandle);
+ *res = llog_handle_get(loghandle);
+ LASSERT(*res);
down_write(&cathandle->lgh_lock);
list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
up_write(&cathandle->lgh_lock);
down_write(&cathandle->lgh_lock);
list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
up_write(&cathandle->lgh_lock);
loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
loghandle->u.phd.phd_cookie.lgc_index =
loghandle->lgh_hdr->llh_cat_idx;
loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
loghandle->u.phd.phd_cookie.lgc_index =
loghandle->lgh_hdr->llh_cat_idx;
- EXIT;
-out:
- llog_handle_get(loghandle);
- *res = loghandle;
- return 0;
}
int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
}
int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
cathandle->lgh_ctxt->loc_obd->obd_name,
PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
cathandle->lgh_ctxt->loc_obd->obd_name,
PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
- llog_handle_put(loghandle);
+ llog_handle_put(env, loghandle);
- llog_handle_put(loghandle);
+ llog_handle_put(env, loghandle);
if (rc)
CERROR("%s: fail to cancel %d llog-records: rc = %d\n",
if (rc)
CERROR("%s: fail to cancel %d llog-records: rc = %d\n",
+ llog_handle_put(env, llh);
+ llog_handle_put(env, llh);
rc = llog_cat_cleanup(env, cat_llh, llh,
llh->u.phd.phd_cookie.lgc_index);
rc = llog_cat_cleanup(env, cat_llh, llh,
llh->u.phd.phd_cookie.lgc_index);
+ llog_handle_put(env, llh);
int llog_info_init(void);
void llog_info_fini(void);
int llog_info_init(void);
void llog_info_fini(void);
-void llog_handle_get(struct llog_handle *loghandle);
-void llog_handle_put(struct llog_handle *loghandle);
+struct llog_handle *llog_handle_get(struct llog_handle *loghandle);
+int llog_handle_put(const struct lu_env *env, struct llog_handle *loghandle);
int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
struct llog_handle **res, struct llog_logid *logid);
int class_config_dump_handler(const struct lu_env *env,
int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
struct llog_handle **res, struct llog_logid *logid);
int class_config_dump_handler(const struct lu_env *env,
RETURN(rc);
}
rc = llog_process(env, loghandle, llog_check_cb, NULL, NULL);
RETURN(rc);
}
rc = llog_process(env, loghandle, llog_check_cb, NULL, NULL);
- llog_handle_put(loghandle);
+ llog_handle_put(env, loghandle);
}
llog_cat_cleanup(env, cat, log, log->u.phd.phd_cookie.lgc_index);
out:
}
llog_cat_cleanup(env, cat, log, log->u.phd.phd_cookie.lgc_index);
out:
+ llog_handle_put(env, log);
static int llog_osd_exist(struct llog_handle *handle)
{
LASSERT(handle->lgh_obj);
static int llog_osd_exist(struct llog_handle *handle)
{
LASSERT(handle->lgh_obj);
- return dt_object_exists(handle->lgh_obj) &&
- !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
+ return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed;
}
static void *rec_tail(struct llog_rec_hdr *rec)
}
static void *rec_tail(struct llog_rec_hdr *rec)
o = loghandle->lgh_obj;
LASSERT(o);
o = loghandle->lgh_obj;
LASSERT(o);
- LASSERT(dt_object_exists(o));
+ LASSERT(llog_osd_exist(loghandle));
dt = lu2dt_dev(o->do_lu.lo_dev);
LASSERT(dt);
dt = lu2dt_dev(o->do_lu.lo_dev);
LASSERT(dt);
o = loghandle->lgh_obj;
LASSERT(o);
o = loghandle->lgh_obj;
LASSERT(o);
- LASSERT(dt_object_exists(o));
+ LASSERT(llog_osd_exist(loghandle));
dt = lu2dt_dev(o->do_lu.lo_dev);
LASSERT(dt);
dt = lu2dt_dev(o->do_lu.lo_dev);
LASSERT(dt);
LASSERT(o != NULL);
dt_write_lock(env, o, 0);
LASSERT(o != NULL);
dt_write_lock(env, o, 0);
- if (!dt_object_exists(o))
+ if (!llog_osd_exist(loghandle))
GOTO(out_unlock, rc = 0);
if (loghandle->lgh_name) {
GOTO(out_unlock, rc = 0);
if (loghandle->lgh_name) {
if (rc < 0)
GOTO(out_unlock, rc);
if (rc < 0)
GOTO(out_unlock, rc);
+ loghandle->lgh_destroyed = true;
if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
if (rc < 0)
if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
if (rc < 0)