Whamcloud - gitweb
LU-6838 llog: limit file size of plain logs
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index fb8fba4..6c3c288 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014 Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #define DEBUG_SUBSYSTEM S_LOG
 
+#include <dt_object.h>
+#include <llog_swab.h>
+#include <lustre_fid.h>
 #include <obd.h>
 #include <obd_class.h>
-#include <lustre_fid.h>
-#include <dt_object.h>
 
 #include "llog_internal.h"
 #include "local_storage.h"
@@ -111,6 +112,29 @@ static int llog_osd_create_new_object(const struct lu_env *env,
 }
 
 /**
+ * Implementation of the llog_operations::lop_exist
+ *
+ * This function checks that llog exists on storage.
+ *
+ * \param[in] handle   llog handle of the current llog
+ *
+ * \retval             true if llog object exists and is not just destroyed
+ * \retval             false if llog doesn't exist or just destroyed
+ */
+static int llog_osd_exist(struct llog_handle *handle)
+{
+       LASSERT(handle->lgh_obj);
+       return dt_object_exists(handle->lgh_obj) &&
+               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
+}
+
+static void *rec_tail(struct llog_rec_hdr *rec)
+{
+       return (void *)((char *)rec + rec->lrh_len -
+                       sizeof(struct llog_rec_tail));
+}
+
+/**
  * Write a padding record to the llog
  *
  * This function writes a padding record to the end of llog. That may
@@ -262,6 +286,7 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+       handle->lgh_write_offset = lgi->lgi_attr.la_size;
 
        RETURN(0);
 }
@@ -364,20 +389,20 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct dt_object        *o;
        __u32                   chunk_size;
        size_t                   left;
-
+       __u32                   orig_last_idx;
+       __u64                   orig_write_offset;
        ENTRY;
 
-       LASSERT(env);
        llh = loghandle->lgh_hdr;
-       LASSERT(llh);
        o = loghandle->lgh_obj;
-       LASSERT(o);
-       LASSERT(th);
 
        chunk_size = llh->llh_hdr.lrh_len;
        CDEBUG(D_OTHER, "new record %x to "DFID"\n",
               rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
 
+       if (!llog_osd_exist(loghandle))
+               RETURN(-ENOENT);
+
        /* record length should not bigger than  */
        if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
                RETURN(-E2BIG);
@@ -521,8 +546,31 @@ static int llog_osd_write_rec(const struct lu_env *env,
         * process them page-at-a-time if needed.  If it will cross a chunk
         * boundary, write in a fake (but referenced) entry to pad the chunk.
         */
+
+
+       /* simulate ENOSPC when new plain llog is being added to the
+        * catalog */
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) &&
+           llh->llh_flags & LLOG_F_IS_CAT)
+               RETURN(-ENOSPC);
+
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+       orig_last_idx = loghandle->lgh_last_idx;
+       orig_write_offset = loghandle->lgh_write_offset;
        lgi->lgi_off = lgi->lgi_attr.la_size;
+
+       if (loghandle->lgh_max_size > 0 &&
+           lgi->lgi_off >= loghandle->lgh_max_size) {
+               CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u "
+                      DOSTID"\n", (unsigned)lgi->lgi_off,
+                      loghandle->lgh_max_size,
+                      (int)loghandle->lgh_last_idx,
+                      POSTID(&loghandle->lgh_id.lgl_oi));
+               /* this is to signal that this llog is full */
+               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+               RETURN(-ENOSPC);
+       }
+
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
        if (left != 0 && left != reclen &&
@@ -531,11 +579,15 @@ static int llog_osd_write_rec(const struct lu_env *env,
                rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
                if (rc)
                        RETURN(rc);
+
+               if (dt_object_remote(o))
+                       loghandle->lgh_write_offset = lgi->lgi_off;
+
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC
         * or wrap around if a catalog */
-       if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
+       if (llog_is_full(loghandle) ||
            unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
                     OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
                     loghandle->lgh_last_idx >= cfs_fail_val)) {
@@ -632,6 +684,9 @@ out_unlock:
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
+       } else if (dt_object_remote(o)) {
+               lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
+                                    lgi->lgi_off);
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
                if (rc)
@@ -648,8 +703,11 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
-       CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
-              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+       if (dt_object_remote(o))
+               loghandle->lgh_write_offset = lgi->lgi_off;
+
+       CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n",
+              PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
        if (reccookie != NULL) {
                reccookie->lgc_lgl = loghandle->lgh_id;
@@ -672,11 +730,15 @@ out:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
-       if (--loghandle->lgh_last_idx == 0 &&
+       if (dt_object_remote(o)) {
+               loghandle->lgh_last_idx = orig_last_idx;
+               loghandle->lgh_write_offset = orig_write_offset;
+       } else if (--loghandle->lgh_last_idx == 0 &&
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
                loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
        }
+
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
@@ -793,9 +855,6 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
-       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
-              next_idx, *cur_idx, *cur_offset);
-
        LASSERT(loghandle);
        LASSERT(loghandle->lgh_ctxt);
 
@@ -809,6 +868,10 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (rc)
                GOTO(out, rc);
 
+       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off"
+              "%llu), size %llu\n", next_idx, *cur_idx,
+              *cur_offset, lgi->lgi_attr.la_size);
+
        while (*cur_offset < lgi->lgi_attr.la_size) {
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
@@ -827,7 +890,7 @@ static int llog_osd_next_block(const struct lu_env *env,
                                goto retry;
 
                        CERROR("%s: can't read llog block from log "DFID
-                              " offset "LPU64": rc = %d\n",
+                              " offset %llu: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               PFID(lu_object_fid(&o->do_lu)), *cur_offset,
                               rc);
@@ -851,7 +914,7 @@ static int llog_osd_next_block(const struct lu_env *env,
                                goto retry;
 
                        CERROR("%s: invalid llog block at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
@@ -870,14 +933,23 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
-               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+               if (last_rec->lrh_index != tail->lrt_index) {
+                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
+                              "offset %llu last_rec idx %u tail idx %u\n",
+                              o->do_lu.lo_dev->ld_obd->obd_name,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen, *cur_offset,
+                              last_rec->lrh_index, tail->lrt_index);
+                       GOTO(out, rc = -EINVAL);
+               }
 
                *cur_idx = tail->lrt_index;
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
                        CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
-                              "offset "LPU64" bytes %d\n",
+                              "offset %llu bytes %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, *cur_offset, rc);
@@ -985,7 +1057,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
                rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
                if (rc < 0) {
                        CERROR("%s: can't read llog block from log "DFID
-                              " offset "LPU64": rc = %d\n",
+                              " offset %llu: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
                        GOTO(out, rc);
@@ -996,7 +1068,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
                if (rc < sizeof(*tail)) {
                        CERROR("%s: invalid llog block at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
@@ -1020,7 +1092,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
                        CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
@@ -1119,6 +1191,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        struct ls_device                *ls;
        struct local_oid_storage        *los = NULL;
        int                              rc = 0;
+       bool new_id = false;
 
        ENTRY;
 
@@ -1179,6 +1252,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
                        /* generate fid for new llog */
                        rc = local_object_fid_generate(env, los,
                                                       &lgi->lgi_fid);
+                       new_id = true;
                }
                if (rc < 0)
                        GOTO(out, rc);
@@ -1190,15 +1264,30 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        } else {
                LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param);
                /* generate fid for new llog */
+generate:
                rc = local_object_fid_generate(env, los, &lgi->lgi_fid);
                if (rc < 0)
                        GOTO(out, rc);
+               new_id = true;
        }
 
        o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
        if (IS_ERR(o))
                GOTO(out_name, rc = PTR_ERR(o));
 
+       if (dt_object_exists(o) && new_id) {
+               /* llog exists with just generated ID, e.g. some old llog file
+                * still is in use or is orphan, drop a warn and skip it. */
+               CDEBUG(D_INFO, "%s: llog exists with the same FID: "DFID
+                      ", skipping\n",
+                      o->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&o->do_lu)));
+               lu_object_put(env, &o->do_lu);
+               /* just skip this llog ID, we shouldn't delete it because we
+                * don't know exactly what is its purpose and state. */
+               goto generate;
+       }
+
 after_open:
        /* No new llog is expected but doesn't exist */
        if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
@@ -1223,23 +1312,6 @@ out:
 }
 
 /**
- * Implementation of the llog_operations::lop_exist
- *
- * This function checks that llog exists on storage.
- *
- * \param[in] handle   llog handle of the current llog
- *
- * \retval             true if llog object exists and is not just destroyed
- * \retval             false if llog doesn't exist or just destroyed
- */
-static int llog_osd_exist(struct llog_handle *handle)
-{
-       LASSERT(handle->lgh_obj);
-       return (dt_object_exists(handle->lgh_obj) &&
-               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header));
-}
-
-/**
  * Get dir for regular fid log object
  *
  * Get directory for regular fid log object, and these regular fid log