Whamcloud - gitweb
LU-10198 llog: keep llog handle alive until last reference 67/37367/2
authorMikhail Pershin <mpershin@whamcloud.com>
Wed, 29 Jan 2020 21:22:07 +0000 (00:22 +0300)
committerOleg Drokin <green@whamcloud.com>
Sat, 8 Feb 2020 03:59:46 +0000 (03:59 +0000)
Llog handle keeps related dt_object pinned until llog_close()
call, meanwhile llog handle can still have other users which
took llog handle via llog_cat_id2handle()

Patch changes llog_handle_put() to call lop_close() upon last
reference drop. So llog_osd_close() will put dt_object only
when llog_handle has no more references.
The llog_handle_get() checks and reports if llog_handle has
zero reference.
Also patch modifies checks for destroyed llogs, llog handle
has new lgh_destroyed flag which is set when llog is destroyed,
llog_osd_exist() checks dt_object_exist() and lgh_destroyed
flag, so destroyed llogs are considered as non-existent too.
Previously it uses lu_object_is_dying() check which is not
reliable because means only that object is not to be kept in
cache.

Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Change-Id: If7df41646c243c0d40b20a30a33e86c688d24508
Reviewed-on: https://review.whamcloud.com/37367
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_log.h
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_osd.c

index ae37da4..d58ddd8 100644 (file)
@@ -283,9 +283,10 @@ struct llog_handle {
        char                    *lgh_name;
        void                    *private_data;
        struct llog_operations  *lgh_logops;
        char                    *lgh_name;
        void                    *private_data;
        struct llog_operations  *lgh_logops;
-       atomic_t                 lgh_refcount;
+       refcount_t               lgh_refcount;
 
        int                     lgh_max_size;
 
        int                     lgh_max_size;
+       bool                    lgh_destroyed;
 };
 
 /* llog_osd.c */
 };
 
 /* llog_osd.c */
index 28aa947..8a24d05 100644 (file)
@@ -66,7 +66,7 @@ static struct llog_handle *llog_alloc_handle(void)
        mutex_init(&loghandle->lgh_hdr_mutex);
        init_rwsem(&loghandle->lgh_last_sem);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
        mutex_init(&loghandle->lgh_hdr_mutex);
        init_rwsem(&loghandle->lgh_last_sem);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
-       atomic_set(&loghandle->lgh_refcount, 1);
+       refcount_set(&loghandle->lgh_refcount, 1);
 
        return loghandle;
 }
 
        return loghandle;
 }
@@ -91,16 +91,30 @@ out:
        OBD_FREE_PTR(loghandle);
 }
 
        OBD_FREE_PTR(loghandle);
 }
 
-void llog_handle_get(struct llog_handle *loghandle)
+struct llog_handle *llog_handle_get(struct llog_handle *loghandle)
 {
 {
-       atomic_inc(&loghandle->lgh_refcount);
+       if (refcount_inc_not_zero(&loghandle->lgh_refcount))
+               return loghandle;
+       return NULL;
 }
 
 }
 
-void llog_handle_put(struct llog_handle *loghandle)
+int llog_handle_put(const struct lu_env *env, struct llog_handle *loghandle)
 {
 {
-       LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
-       if (atomic_dec_and_test(&loghandle->lgh_refcount))
+       int rc = 0;
+
+       if (refcount_dec_and_test(&loghandle->lgh_refcount)) {
+               struct llog_operations *lop;
+
+               rc = llog_handle2ops(loghandle, &lop);
+               if (!rc) {
+                       if (lop->lop_close)
+                               rc = lop->lop_close(env, loghandle);
+                       else
+                               rc = -EOPNOTSUPP;
+               }
                llog_free_handle(loghandle);
                llog_free_handle(loghandle);
+       }
+       return rc;
 }
 
 static int llog_declare_destroy(const struct lu_env *env,
 }
 
 static int llog_declare_destroy(const struct lu_env *env,
@@ -137,7 +151,7 @@ int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
                RETURN(-EOPNOTSUPP);
 
        LASSERT(handle->lgh_obj != NULL);
                RETURN(-EOPNOTSUPP);
 
        LASSERT(handle->lgh_obj != NULL);
-       if (!dt_object_exists(handle->lgh_obj))
+       if (!llog_exist(handle))
                RETURN(0);
 
        rc = lop->lop_destroy(env, handle, th);
                RETURN(0);
 
        rc = lop->lop_destroy(env, handle, th);
@@ -166,7 +180,7 @@ int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
-       if (!dt_object_exists(handle->lgh_obj))
+       if (!llog_exist(handle))
                RETURN(0);
 
        dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
                RETURN(0);
 
        dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
@@ -1306,20 +1320,7 @@ EXPORT_SYMBOL(llog_open);
 
 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
 {
 
 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
 {
-       struct llog_operations  *lop;
-       int                      rc;
-
-       ENTRY;
-
-       rc = llog_handle2ops(loghandle, &lop);
-       if (rc)
-               GOTO(out, rc);
-       if (lop->lop_close == NULL)
-               GOTO(out, rc = -EOPNOTSUPP);
-       rc = lop->lop_close(env, loghandle);
-out:
-       llog_handle_put(loghandle);
-       RETURN(rc);
+       return llog_handle_put(env, loghandle);
 }
 EXPORT_SYMBOL(llog_close);
 
 }
 EXPORT_SYMBOL(llog_close);
 
index c28ca74..1369f53 100644 (file)
@@ -396,9 +396,16 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                                      cgl->lgl_ogen, logid->lgl_ogen);
                                continue;
                        }
                                      cgl->lgl_ogen, logid->lgl_ogen);
                                continue;
                        }
+                       *res = llog_handle_get(loghandle);
+                       if (!*res) {
+                               CERROR("%s: log "DFID" refcount is zero!\n",
+                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      PFID(&logid->lgl_oi.oi_fid));
+                               continue;
+                       }
                        loghandle->u.phd.phd_cat_handle = cathandle;
                        up_write(&cathandle->lgh_lock);
                        loghandle->u.phd.phd_cat_handle = cathandle;
                        up_write(&cathandle->lgh_lock);
-                       GOTO(out, rc = 0);
+                       RETURN(rc);
                }
        }
        up_write(&cathandle->lgh_lock);
                }
        }
        up_write(&cathandle->lgh_lock);
@@ -415,10 +422,12 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
        rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
        if (rc < 0) {
                llog_close(env, loghandle);
        rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN | fmt, NULL);
        if (rc < 0) {
                llog_close(env, loghandle);
-               loghandle = NULL;
+               *res = NULL;
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
+       *res = llog_handle_get(loghandle);
+       LASSERT(*res);
        down_write(&cathandle->lgh_lock);
        list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
        up_write(&cathandle->lgh_lock);
        down_write(&cathandle->lgh_lock);
        list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
        up_write(&cathandle->lgh_lock);
@@ -427,11 +436,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
        loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
        loghandle->u.phd.phd_cookie.lgc_index =
                                loghandle->lgh_hdr->llh_cat_idx;
        loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
        loghandle->u.phd.phd_cookie.lgc_index =
                                loghandle->lgh_hdr->llh_cat_idx;
-       EXIT;
-out:
-       llog_handle_get(loghandle);
-       *res = loghandle;
-       return 0;
+       RETURN(0);
 }
 
 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
 }
 
 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
@@ -727,7 +732,7 @@ int llog_cat_cancel_arr_rec(const struct lu_env *env,
                       cathandle->lgh_ctxt->loc_obd->obd_name,
                       PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
 
                       cathandle->lgh_ctxt->loc_obd->obd_name,
                       PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
 
-               llog_handle_put(loghandle);
+               llog_handle_put(env, loghandle);
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
@@ -743,7 +748,7 @@ int llog_cat_cancel_arr_rec(const struct lu_env *env,
                rc = 0;
 
        }
                rc = 0;
 
        }
-       llog_handle_put(loghandle);
+       llog_handle_put(env, loghandle);
 
        if (rc)
                CERROR("%s: fail to cancel %d llog-records: rc = %d\n",
 
        if (rc)
                CERROR("%s: fail to cancel %d llog-records: rc = %d\n",
@@ -888,7 +893,7 @@ out:
        }
 
        if (llh)
        }
 
        if (llh)
-               llog_handle_put(llh);
+               llog_handle_put(env, llh);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -993,7 +998,7 @@ static int llog_cat_size_cb(const struct lu_env *env,
        }
 
        if (llh != NULL)
        }
 
        if (llh != NULL)
-               llog_handle_put(llh);
+               llog_handle_put(env, llh);
 
        RETURN(0);
 }
 
        RETURN(0);
 }
@@ -1060,7 +1065,7 @@ static int llog_cat_reverse_process_cb(const struct lu_env *env,
                rc = llog_cat_cleanup(env, cat_llh, llh,
                                      llh->u.phd.phd_cookie.lgc_index);
 
                rc = llog_cat_cleanup(env, cat_llh, llh,
                                      llh->u.phd.phd_cookie.lgc_index);
 
-       llog_handle_put(llh);
+       llog_handle_put(env, llh);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
index 7f581be..2642529 100644 (file)
@@ -74,8 +74,8 @@ static inline struct llog_thread_info *llog_info(const struct lu_env *env)
 int llog_info_init(void);
 void llog_info_fini(void);
 
 int llog_info_init(void);
 void llog_info_fini(void);
 
-void llog_handle_get(struct llog_handle *loghandle);
-void llog_handle_put(struct llog_handle *loghandle);
+struct llog_handle *llog_handle_get(struct llog_handle *loghandle);
+int llog_handle_put(const struct lu_env *env, struct llog_handle *loghandle);
 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                       struct llog_handle **res, struct llog_logid *logid);
 int class_config_dump_handler(const struct lu_env *env,
 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                       struct llog_handle **res, struct llog_logid *logid);
 int class_config_dump_handler(const struct lu_env *env,
index 1ee49ad..3db9ef5 100644 (file)
@@ -167,7 +167,7 @@ static int llog_check_cb(const struct lu_env *env, struct llog_handle *handle,
                        RETURN(rc);
                }
                rc = llog_process(env, loghandle, llog_check_cb, NULL, NULL);
                        RETURN(rc);
                }
                rc = llog_process(env, loghandle, llog_check_cb, NULL, NULL);
-               llog_handle_put(loghandle);
+               llog_handle_put(env, loghandle);
        } else {
                bool ok;
 
        } else {
                bool ok;
 
@@ -292,7 +292,7 @@ static int llog_remove_log(const struct lu_env *env, struct llog_handle *cat,
        }
        llog_cat_cleanup(env, cat, log, log->u.phd.phd_cookie.lgc_index);
 out:
        }
        llog_cat_cleanup(env, cat, log, log->u.phd.phd_cookie.lgc_index);
 out:
-       llog_handle_put(log);
+       llog_handle_put(env, log);
        RETURN(rc);
 
 }
        RETURN(rc);
 
 }
index fc0b16f..54f3dd4 100644 (file)
@@ -126,8 +126,7 @@ static int llog_osd_create_new_object(const struct lu_env *env,
 static int llog_osd_exist(struct llog_handle *handle)
 {
        LASSERT(handle->lgh_obj);
 static int llog_osd_exist(struct llog_handle *handle)
 {
        LASSERT(handle->lgh_obj);
-       return dt_object_exists(handle->lgh_obj) &&
-               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
+       return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed;
 }
 
 static void *rec_tail(struct llog_rec_hdr *rec)
 }
 
 static void *rec_tail(struct llog_rec_hdr *rec)
@@ -903,7 +902,7 @@ static int llog_osd_next_block(const struct lu_env *env,
 
        o = loghandle->lgh_obj;
        LASSERT(o);
 
        o = loghandle->lgh_obj;
        LASSERT(o);
-       LASSERT(dt_object_exists(o));
+       LASSERT(llog_osd_exist(loghandle));
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
@@ -1077,7 +1076,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
        o = loghandle->lgh_obj;
        LASSERT(o);
 
        o = loghandle->lgh_obj;
        LASSERT(o);
-       LASSERT(dt_object_exists(o));
+       LASSERT(llog_osd_exist(loghandle));
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
@@ -1800,7 +1799,7 @@ static int llog_osd_destroy(const struct lu_env *env,
        LASSERT(o != NULL);
 
        dt_write_lock(env, o, 0);
        LASSERT(o != NULL);
 
        dt_write_lock(env, o, 0);
-       if (!dt_object_exists(o))
+       if (!llog_osd_exist(loghandle))
                GOTO(out_unlock, rc = 0);
 
        if (loghandle->lgh_name) {
                GOTO(out_unlock, rc = 0);
 
        if (loghandle->lgh_name) {
@@ -1826,6 +1825,7 @@ static int llog_osd_destroy(const struct lu_env *env,
        if (rc < 0)
                GOTO(out_unlock, rc);
 
        if (rc < 0)
                GOTO(out_unlock, rc);
 
+       loghandle->lgh_destroyed = true;
        if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
                rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
                if (rc < 0)
        if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
                rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
                if (rc < 0)