Whamcloud - gitweb
LU-7981 llite: take trunc_sem only at vvp layer
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index fb8fba4..8432d39 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014 Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #define DEBUG_SUBSYSTEM S_LOG
 
+#include <dt_object.h>
+#include <llog_swab.h>
+#include <lustre_fid.h>
 #include <obd.h>
 #include <obd_class.h>
-#include <lustre_fid.h>
-#include <dt_object.h>
 
 #include "llog_internal.h"
 #include "local_storage.h"
@@ -111,6 +112,23 @@ static int llog_osd_create_new_object(const struct lu_env *env,
 }
 
 /**
+ * Implementation of the llog_operations::lop_exist
+ *
+ * This function checks that llog exists on storage.
+ *
+ * \param[in] handle   llog handle of the current llog
+ *
+ * \retval             true if llog object exists and is not just destroyed
+ * \retval             false if llog doesn't exist or just destroyed
+ */
+static int llog_osd_exist(struct llog_handle *handle)
+{
+       LASSERT(handle->lgh_obj);
+       return dt_object_exists(handle->lgh_obj) &&
+               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
+}
+
+/**
  * Write a padding record to the llog
  *
  * This function writes a padding record to the end of llog. That may
@@ -262,6 +280,7 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+       handle->lgh_write_offset = lgi->lgi_attr.la_size;
 
        RETURN(0);
 }
@@ -364,7 +383,8 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct dt_object        *o;
        __u32                   chunk_size;
        size_t                   left;
-
+       __u32                   orig_last_idx;
+       __u64                   orig_write_offset;
        ENTRY;
 
        LASSERT(env);
@@ -378,6 +398,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
        CDEBUG(D_OTHER, "new record %x to "DFID"\n",
               rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
 
+       if (!llog_osd_exist(loghandle))
+               RETURN(-ENOENT);
+
        /* record length should not bigger than  */
        if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
                RETURN(-E2BIG);
@@ -521,7 +544,17 @@ static int llog_osd_write_rec(const struct lu_env *env,
         * process them page-at-a-time if needed.  If it will cross a chunk
         * boundary, write in a fake (but referenced) entry to pad the chunk.
         */
+
+
+       /* simulate ENOSPC when new plain llog is being added to the
+        * catalog */
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) &&
+           llh->llh_flags & LLOG_F_IS_CAT)
+               RETURN(-ENOSPC);
+
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+       orig_last_idx = loghandle->lgh_last_idx;
+       orig_write_offset = loghandle->lgh_write_offset;
        lgi->lgi_off = lgi->lgi_attr.la_size;
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
@@ -531,11 +564,15 @@ static int llog_osd_write_rec(const struct lu_env *env,
                rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
                if (rc)
                        RETURN(rc);
+
+               if (dt_object_remote(o))
+                       loghandle->lgh_write_offset = lgi->lgi_off;
+
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC
         * or wrap around if a catalog */
-       if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
+       if (llog_is_full(loghandle) ||
            unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
                     OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
                     loghandle->lgh_last_idx >= cfs_fail_val)) {
@@ -632,6 +669,9 @@ out_unlock:
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
+       } else if (dt_object_remote(o)) {
+               lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
+                                    lgi->lgi_off);
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
                if (rc)
@@ -648,8 +688,11 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
-       CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
-              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+       if (dt_object_remote(o))
+               loghandle->lgh_write_offset = lgi->lgi_off;
+
+       CDEBUG(D_HA, "added record "DFID": idx: %u, %u off"LPU64"\n",
+              PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
        if (reccookie != NULL) {
                reccookie->lgc_lgl = loghandle->lgh_id;
@@ -672,11 +715,15 @@ out:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
-       if (--loghandle->lgh_last_idx == 0 &&
+       if (dt_object_remote(o)) {
+               loghandle->lgh_last_idx = orig_last_idx;
+               loghandle->lgh_write_offset = orig_write_offset;
+       } else if (--loghandle->lgh_last_idx == 0 &&
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
                loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
        }
+
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
@@ -793,9 +840,6 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
-       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
-              next_idx, *cur_idx, *cur_offset);
-
        LASSERT(loghandle);
        LASSERT(loghandle->lgh_ctxt);
 
@@ -809,6 +853,10 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (rc)
                GOTO(out, rc);
 
+       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off"
+              LPU64"), size %llu\n", next_idx, *cur_idx,
+              *cur_offset, lgi->lgi_attr.la_size);
+
        while (*cur_offset < lgi->lgi_attr.la_size) {
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
@@ -870,7 +918,16 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
-               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+               if (last_rec->lrh_index != tail->lrt_index) {
+                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
+                              "offset "LPU64" last_rec idx %u tail idx %u\n",
+                              o->do_lu.lo_dev->ld_obd->obd_name,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen, *cur_offset,
+                              last_rec->lrh_index, tail->lrt_index);
+                       GOTO(out, rc = -EINVAL);
+               }
 
                *cur_idx = tail->lrt_index;
 
@@ -1223,23 +1280,6 @@ out:
 }
 
 /**
- * Implementation of the llog_operations::lop_exist
- *
- * This function checks that llog exists on storage.
- *
- * \param[in] handle   llog handle of the current llog
- *
- * \retval             true if llog object exists and is not just destroyed
- * \retval             false if llog doesn't exist or just destroyed
- */
-static int llog_osd_exist(struct llog_handle *handle)
-{
-       LASSERT(handle->lgh_obj);
-       return (dt_object_exists(handle->lgh_obj) &&
-               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header));
-}
-
-/**
  * Get dir for regular fid log object
  *
  * Get directory for regular fid log object, and these regular fid log