Whamcloud - gitweb
LU-10198 llog: keep llog handle alive until last reference
[fs/lustre-release.git] / lustre / obdclass / llog.c
index e802334..8a24d05 100644 (file)
@@ -66,7 +66,7 @@ static struct llog_handle *llog_alloc_handle(void)
        mutex_init(&loghandle->lgh_hdr_mutex);
        init_rwsem(&loghandle->lgh_last_sem);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
-       atomic_set(&loghandle->lgh_refcount, 1);
+       refcount_set(&loghandle->lgh_refcount, 1);
 
        return loghandle;
 }
@@ -91,16 +91,30 @@ out:
        OBD_FREE_PTR(loghandle);
 }
 
-void llog_handle_get(struct llog_handle *loghandle)
+struct llog_handle *llog_handle_get(struct llog_handle *loghandle)
 {
-       atomic_inc(&loghandle->lgh_refcount);
+       if (refcount_inc_not_zero(&loghandle->lgh_refcount))
+               return loghandle;
+       return NULL;
 }
 
-void llog_handle_put(struct llog_handle *loghandle)
+int llog_handle_put(const struct lu_env *env, struct llog_handle *loghandle)
 {
-       LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
-       if (atomic_dec_and_test(&loghandle->lgh_refcount))
+       int rc = 0;
+
+       if (refcount_dec_and_test(&loghandle->lgh_refcount)) {
+               struct llog_operations *lop;
+
+               rc = llog_handle2ops(loghandle, &lop);
+               if (!rc) {
+                       if (lop->lop_close)
+                               rc = lop->lop_close(env, loghandle);
+                       else
+                               rc = -EOPNOTSUPP;
+               }
                llog_free_handle(loghandle);
+       }
+       return rc;
 }
 
 static int llog_declare_destroy(const struct lu_env *env,
@@ -137,7 +151,7 @@ int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
                RETURN(-EOPNOTSUPP);
 
        LASSERT(handle->lgh_obj != NULL);
-       if (!dt_object_exists(handle->lgh_obj))
+       if (!llog_exist(handle))
                RETURN(0);
 
        rc = lop->lop_destroy(env, handle, th);
@@ -166,7 +180,7 @@ int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
                RETURN(rc);
        }
 
-       if (!dt_object_exists(handle->lgh_obj))
+       if (!llog_exist(handle))
                RETURN(0);
 
        dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
@@ -554,6 +568,13 @@ repeat:
                            synced_idx == LLOG_HDR_TAIL(llh)->lrt_index)
                                GOTO(out, rc = 0);
 
+                       if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
+                               cfs_fail_val == (unsigned int)
+                                       (loghandle->lgh_id.lgl_oi.oi.oi_id &
+                                        0xFFFFFFFF)) {
+                               OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT);
+                       }
+
                        /* the bitmap could be changed during processing
                         * records from the chunk. For wrapped catalog
                         * it means we can read deleted record and try to
@@ -625,13 +646,6 @@ repeat:
                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
                                                    chunk_offset;
 
-                       if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
-                               index == lh_last_idx &&
-                               cfs_fail_val == (unsigned int)
-                                       (loghandle->lgh_id.lgl_oi.oi.oi_id &
-                                        0xFFFFFFFF)) {
-                               OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT);
-                       }
                        /* if set, process the callback on this record */
                        if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
                                struct llog_cookie *lgc;
@@ -748,7 +762,8 @@ static int llog_process_thread_daemonize(void *arg)
                 * used outside of the kernel itself, because it calls
                 * free_nsproxy() which is not exported by the kernel
                 * (defined in kernel/nsproxy.c) */
-               atomic_dec(&curr_ns->count);
+               if (curr_ns)
+                       atomic_dec(&curr_ns->count);
        }
        task_unlock(lpi->lpi_reftask);
 
@@ -772,10 +787,12 @@ int llog_process_or_fork(const struct lu_env *env,
                         struct llog_handle *loghandle,
                         llog_cb_t cb, void *data, void *catdata, bool fork)
 {
-        struct llog_process_info *lpi;
-        int                      rc;
+       struct llog_process_info *lpi;
+       struct llog_process_data *d = data;
+       struct llog_process_cat_data *cd = catdata;
+       int                      rc;
 
-        ENTRY;
+       ENTRY;
 
        OBD_ALLOC_PTR(lpi);
        if (lpi == NULL) {
@@ -787,6 +804,11 @@ int llog_process_or_fork(const struct lu_env *env,
        lpi->lpi_cbdata    = data;
        lpi->lpi_catdata   = catdata;
 
+       CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d\n",
+              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+              loghandle->lgh_hdr->llh_flags, d ? d->lpd_startcat : -1,
+              d ? d->lpd_startidx : -1, cd ? cd->lpcd_first_idx : -1,
+              cd ? cd->lpcd_last_idx : -1);
        if (fork) {
                struct task_struct *task;
 
@@ -828,6 +850,27 @@ int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
 }
 EXPORT_SYMBOL(llog_process);
 
+static inline const struct cred *llog_raise_resource(void)
+{
+       struct cred *cred = NULL;
+
+       if (cap_raised(current_cap(), CAP_SYS_RESOURCE))
+               return cred;
+
+       cred = prepare_creds();
+       if (!cred)
+               return cred;
+
+       cap_raise(cred->cap_effective, CAP_SYS_RESOURCE);
+       return override_creds(cred);
+}
+
+static inline void llog_restore_resource(const struct cred *old_cred)
+{
+       if (old_cred)
+               revert_creds(old_cred);
+}
+
 int llog_reverse_process(const struct lu_env *env,
                         struct llog_handle *loghandle, llog_cb_t cb,
                         void *data, void *catdata)
@@ -952,8 +995,9 @@ EXPORT_SYMBOL(llog_exist);
 int llog_declare_create(const struct lu_env *env,
                        struct llog_handle *loghandle, struct thandle *th)
 {
+       const struct cred *old_cred;
        struct llog_operations  *lop;
-       int                      raised, rc;
+       int rc;
 
        ENTRY;
 
@@ -963,20 +1007,18 @@ int llog_declare_create(const struct lu_env *env,
        if (lop->lop_declare_create == NULL)
                RETURN(-EOPNOTSUPP);
 
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       old_cred = llog_raise_resource();
        rc = lop->lop_declare_create(env, loghandle, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       llog_restore_resource(old_cred);
        RETURN(rc);
 }
 
 int llog_create(const struct lu_env *env, struct llog_handle *handle,
                struct thandle *th)
 {
+       const struct cred *old_cred;
        struct llog_operations  *lop;
-       int                      raised, rc;
+       int rc;
 
        ENTRY;
 
@@ -986,12 +1028,9 @@ int llog_create(const struct lu_env *env, struct llog_handle *handle,
        if (lop->lop_create == NULL)
                RETURN(-EOPNOTSUPP);
 
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       old_cred = llog_raise_resource();
        rc = lop->lop_create(env, handle, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       llog_restore_resource(old_cred);
        RETURN(rc);
 }
 
@@ -1000,8 +1039,9 @@ int llog_declare_write_rec(const struct lu_env *env,
                           struct llog_rec_hdr *rec, int idx,
                           struct thandle *th)
 {
+       const struct cred *old_cred;
        struct llog_operations  *lop;
-       int                      raised, rc;
+       int rc;
 
        ENTRY;
 
@@ -1012,12 +1052,9 @@ int llog_declare_write_rec(const struct lu_env *env,
        if (lop->lop_declare_write_rec == NULL)
                RETURN(-EOPNOTSUPP);
 
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       old_cred = llog_raise_resource();
        rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       llog_restore_resource(old_cred);
        RETURN(rc);
 }
 
@@ -1025,8 +1062,9 @@ int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
                   struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
                   int idx, struct thandle *th)
 {
+       const struct cred *old_cred;
        struct llog_operations  *lop;
-       int                      raised, rc, buflen;
+       int rc, buflen;
 
        ENTRY;
 
@@ -1059,12 +1097,9 @@ int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
        buflen = rec->lrh_len;
        LASSERT(cfs_size_round(buflen) == buflen);
 
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       old_cred = llog_raise_resource();
        rc = lop->lop_write_rec(env, handle, rec, logcookies, idx, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       llog_restore_resource(old_cred);
        RETURN(rc);
 }
 
@@ -1072,19 +1107,17 @@ int llog_add(const struct lu_env *env, struct llog_handle *lgh,
             struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
             struct thandle *th)
 {
-       int raised, rc;
+       const struct cred *old_cred;
+       int rc;
 
        ENTRY;
 
        if (lgh->lgh_logops->lop_add == NULL)
                RETURN(-EOPNOTSUPP);
 
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       old_cred = llog_raise_resource();
        rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       llog_restore_resource(old_cred);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_add);
@@ -1092,19 +1125,17 @@ EXPORT_SYMBOL(llog_add);
 int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
                     struct llog_rec_hdr *rec, struct thandle *th)
 {
-       int raised, rc;
+       const struct cred *old_cred;
+       int rc;
 
        ENTRY;
 
        if (lgh->lgh_logops->lop_declare_add == NULL)
                RETURN(-EOPNOTSUPP);
 
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       old_cred = llog_raise_resource();
        rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       llog_restore_resource(old_cred);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_declare_add);
@@ -1257,7 +1288,7 @@ int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
              struct llog_handle **lgh, struct llog_logid *logid,
              char *name, enum llog_open_param open_param)
 {
-       int      raised;
+       const struct cred *old_cred;
        int      rc;
 
        ENTRY;
@@ -1276,12 +1307,9 @@ int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
        (*lgh)->lgh_ctxt = ctxt;
        (*lgh)->lgh_logops = ctxt->loc_logops;
 
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       old_cred = llog_raise_resource();
        rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       llog_restore_resource(old_cred);
        if (rc) {
                llog_free_handle(*lgh);
                *lgh = NULL;
@@ -1292,20 +1320,7 @@ EXPORT_SYMBOL(llog_open);
 
 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
 {
-       struct llog_operations  *lop;
-       int                      rc;
-
-       ENTRY;
-
-       rc = llog_handle2ops(loghandle, &lop);
-       if (rc)
-               GOTO(out, rc);
-       if (lop->lop_close == NULL)
-               GOTO(out, rc = -EOPNOTSUPP);
-       rc = lop->lop_close(env, loghandle);
-out:
-       llog_handle_put(loghandle);
-       RETURN(rc);
+       return llog_handle_put(env, loghandle);
 }
 EXPORT_SYMBOL(llog_close);