Whamcloud - gitweb
LU-10198 llog: keep llog handle alive until last reference
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index 9d458f4..54f3dd4 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -44,6 +44,8 @@
 
 #define DEBUG_SUBSYSTEM S_LOG
 
+#include <linux/delay.h>
+
 #include <dt_object.h>
 #include <llog_swab.h>
 #include <lustre_fid.h>
@@ -124,8 +126,7 @@ static int llog_osd_create_new_object(const struct lu_env *env,
 static int llog_osd_exist(struct llog_handle *handle)
 {
        LASSERT(handle->lgh_obj);
-       return dt_object_exists(handle->lgh_obj) &&
-               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
+       return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed;
 }
 
 static void *rec_tail(struct llog_rec_hdr *rec)
@@ -362,7 +363,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
  *                             the full llog record to write. This is
  *                             the beginning of buffer to write, the length
  *                             of buffer is stored in \a rec::lrh_len
- * \param[out] reccookie       pointer to the cookie to return back if needed.
+ * \param[in,out] reccookie    pointer to the cookie to return back if needed.
  *                             It is used for further cancel of this llog
  *                             record.
  * \param[in]  idx             index of the llog record. If \a idx == -1 then
@@ -490,26 +491,26 @@ static int llog_osd_write_rec(const struct lu_env *env,
                                             &lgi->lgi_off, th);
 
                        RETURN(rc);
-               } else if (loghandle->lgh_cur_idx > 0) {
+               } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+                       lgi->lgi_off = llh->llh_hdr.lrh_len +
+                                      (idx - 1) * reclen;
+               } else if (reccookie != NULL && reccookie->lgc_index > 0) {
                        /**
-                        * The lgh_cur_offset can be used only if index is
+                        * The lgc_offset can be used only if index is
                         * the same.
                         */
-                       if (idx != loghandle->lgh_cur_idx) {
+                       if (idx != reccookie->lgc_index) {
                                CERROR("%s: modify index mismatch %d %d\n",
                                       o->do_lu.lo_dev->ld_obd->obd_name, idx,
-                                      loghandle->lgh_cur_idx);
+                                      reccookie->lgc_index);
                                RETURN(-EFAULT);
                        }
 
-                       lgi->lgi_off = loghandle->lgh_cur_offset;
+                       lgi->lgi_off = reccookie->lgc_offset;
                        CDEBUG(D_OTHER, "modify record "DFID": idx:%u, "
                               "len:%u offset %llu\n",
                               PFID(&loghandle->lgh_id.lgl_oi.oi_fid), idx,
                               rec->lrh_len, (long long)lgi->lgi_off);
-               } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
-                       lgi->lgi_off = llh->llh_hdr.lrh_len +
-                                      (idx - 1) * reclen;
                } else {
                        /* This can be result of lgh_cur_idx is not set during
                         * llog processing or llh_size is not set to proper
@@ -590,6 +591,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        RETURN(-ENOSPC);
        }
 
+       down_write(&loghandle->lgh_last_sem);
        /* increment the last_idx along with llh_tail index, they should
         * be equal for a llog lifetime */
        loghandle->lgh_last_idx++;
@@ -673,6 +675,12 @@ out_unlock:
        if (rc)
                GOTO(out, rc);
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
+          cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id &
+                                         0xFFFFFFFF)) {
+               OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT);
+               msleep(1 * MSEC_PER_SEC);
+       }
        /* computed index can be used to determine offset for fixed-size
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
@@ -693,6 +701,8 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
+       up_write(&loghandle->lgh_last_sem);
+
        CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
               PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
@@ -726,6 +736,7 @@ out:
        }
 
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
+       up_write(&loghandle->lgh_last_sem);
 
        RETURN(rc);
 }
@@ -781,22 +792,67 @@ static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off,
  * big enough to handle the remapped records. It is also assumed that records
  * of a block have the same format (i.e.: the same features enabled).
  *
- * \param[in,out]    hdr       Header of the block of records to remap.
- * \param[in,out]    last_hdr   Last header, don't read past this point.
- * \param[in]        flags     Flags describing the fields to keep.
+ * \param[in,out]    hdr          Header of the block of records to remap.
+ * \param[in,out]    last_hdr      Last header, don't read past this point.
+ * \param[in]        flags        Flags describing the fields to keep.
+ * \param[in]        extra_flags   Flags describing the extra fields to keep.
  */
 static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
                                     struct llog_rec_hdr *last_hdr,
-                                    enum changelog_rec_flags flags)
+                                    struct llog_handle *loghandle)
 {
+       enum changelog_rec_flags flags = CLF_SUPPORTED;
+       enum changelog_rec_extra_flags extra_flags = CLFE_SUPPORTED;
+
+       if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_XATTR))
+               extra_flags &= ~CLFE_XATTR;
+       if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_OMODE))
+               extra_flags &= ~CLFE_OPEN;
+       if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID))
+               extra_flags &= ~CLFE_NID;
+       if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID))
+               extra_flags &= ~CLFE_UIDGID;
+       if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS))
+               flags &= ~CLF_EXTRA_FLAGS;
+       if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
+               flags &= ~CLF_JOBID;
+
+       if (flags == CLF_SUPPORTED && extra_flags == CLFE_SUPPORTED)
+               return;
+
        if (hdr->lrh_type != CHANGELOG_REC)
                return;
 
        do {
                struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
+               enum changelog_rec_extra_flags xflag = CLFE_INVALID;
 
-               changelog_remap_rec(rec, rec->cr_flags & flags);
+               if (flags & CLF_EXTRA_FLAGS &&
+                   rec->cr_flags & CLF_EXTRA_FLAGS) {
+                       xflag = changelog_rec_extra_flags(rec)->cr_extra_flags &
+                               extra_flags;
+               }
+
+               if (unlikely(hdr->lrh_len == 0)) {
+                       /* It is corruption case, we cannot know the next rec,
+                        * jump to the last one directly to avoid dead loop. */
+                       LCONSOLE(D_WARNING, "Hit invalid llog record: "
+                                "idx %u, type %u, id %u\n",
+                                hdr->lrh_index, hdr->lrh_type, hdr->lrh_id);
+                       hdr = llog_rec_hdr_next(last_hdr);
+                       if (unlikely(hdr == last_hdr))
+                               LCONSOLE(D_WARNING, "The last record crashed: "
+                                        "idx %u, type %u, id %u\n",
+                                        hdr->lrh_index, hdr->lrh_type,
+                                        hdr->lrh_id);
+                       break;
+               }
+
+               changelog_remap_rec(rec, rec->cr_flags & flags, xflag);
                hdr = llog_rec_hdr_next(hdr);
+               /* Yield CPU to avoid soft-lockup if there are too many records
+                * to be handled. */
+               cond_resched();
        } while ((char *)hdr <= (char *)last_hdr);
 }
 
@@ -846,7 +902,7 @@ static int llog_osd_next_block(const struct lu_env *env,
 
        o = loghandle->lgh_obj;
        LASSERT(o);
-       LASSERT(dt_object_exists(o));
+       LASSERT(llog_osd_exist(loghandle));
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
@@ -962,9 +1018,7 @@ static int llog_osd_next_block(const struct lu_env *env,
                }
 
                /* Trim unsupported extensions for compat w/ older clients */
-               if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
-                       changelog_block_trim_ext(rec, last_rec,
-                                                CLF_VERSION | CLF_RENAME);
+               changelog_block_trim_ext(rec, last_rec, loghandle);
 
                GOTO(out, rc = 0);
 
@@ -1022,7 +1076,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
        o = loghandle->lgh_obj;
        LASSERT(o);
-       LASSERT(dt_object_exists(o));
+       LASSERT(llog_osd_exist(loghandle));
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
@@ -1099,9 +1153,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
                }
 
                /* Trim unsupported extensions for compat w/ older clients */
-               if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
-                       changelog_block_trim_ext(rec, last_rec,
-                                                CLF_VERSION | CLF_RENAME);
+               changelog_block_trim_ext(rec, last_rec, loghandle);
 
                GOTO(out, rc = 0);
        }
@@ -1278,9 +1330,12 @@ generate:
 
 after_open:
        /* No new llog is expected but doesn't exist */
-       if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
+       if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) {
+               CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n",
+                      o->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&o->do_lu)), o);
                GOTO(out_put, rc = -ENOENT);
-
+       }
        fid_to_logid(&lgi->lgi_fid, &handle->lgh_id);
        handle->lgh_obj = o;
        handle->private_data = los;
@@ -1387,7 +1442,7 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
                               (struct dt_key *)name, th);
        } else {
                rc = dt_insert(env, dir, (struct dt_rec *)rec,
-                              (struct dt_key *)name, th, 1);
+                              (struct dt_key *)name, th);
        }
        dt_write_unlock(env, dir);
 
@@ -1554,8 +1609,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                rec->rec_type = S_IFREG;
                dt_read_lock(env, llog_dir, 0);
                rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
-                              (struct dt_key *)res->lgh_name,
-                              th, 1);
+                              (struct dt_key *)res->lgh_name, th);
                dt_read_unlock(env, llog_dir);
                dt_object_put(env, llog_dir);
                if (rc)
@@ -1745,7 +1799,7 @@ static int llog_osd_destroy(const struct lu_env *env,
        LASSERT(o != NULL);
 
        dt_write_lock(env, o, 0);
-       if (!dt_object_exists(o))
+       if (!llog_osd_exist(loghandle))
                GOTO(out_unlock, rc = 0);
 
        if (loghandle->lgh_name) {
@@ -1771,6 +1825,7 @@ static int llog_osd_destroy(const struct lu_env *env,
        if (rc < 0)
                GOTO(out_unlock, rc);
 
+       loghandle->lgh_destroyed = true;
        if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
                rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
                if (rc < 0)