Whamcloud - gitweb
LU-6838 llog: limit file size of plain logs
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index e7ae395..6c3c288 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014 Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #define DEBUG_SUBSYSTEM S_LOG
 
+#include <dt_object.h>
+#include <llog_swab.h>
+#include <lustre_fid.h>
 #include <obd.h>
 #include <obd_class.h>
-#include <lustre_fid.h>
-#include <dt_object.h>
 
 #include "llog_internal.h"
 #include "local_storage.h"
@@ -127,6 +128,12 @@ static int llog_osd_exist(struct llog_handle *handle)
                !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
 }
 
+static void *rec_tail(struct llog_rec_hdr *rec)
+{
+       return (void *)((char *)rec + rec->lrh_len -
+                       sizeof(struct llog_rec_tail));
+}
+
 /**
  * Write a padding record to the llog
  *
@@ -279,6 +286,7 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+       handle->lgh_write_offset = lgi->lgi_attr.la_size;
 
        RETURN(0);
 }
@@ -381,15 +389,12 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct dt_object        *o;
        __u32                   chunk_size;
        size_t                   left;
-
+       __u32                   orig_last_idx;
+       __u64                   orig_write_offset;
        ENTRY;
 
-       LASSERT(env);
        llh = loghandle->lgh_hdr;
-       LASSERT(llh);
        o = loghandle->lgh_obj;
-       LASSERT(o);
-       LASSERT(th);
 
        chunk_size = llh->llh_hdr.lrh_len;
        CDEBUG(D_OTHER, "new record %x to "DFID"\n",
@@ -550,7 +555,22 @@ static int llog_osd_write_rec(const struct lu_env *env,
                RETURN(-ENOSPC);
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+       orig_last_idx = loghandle->lgh_last_idx;
+       orig_write_offset = loghandle->lgh_write_offset;
        lgi->lgi_off = lgi->lgi_attr.la_size;
+
+       if (loghandle->lgh_max_size > 0 &&
+           lgi->lgi_off >= loghandle->lgh_max_size) {
+               CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u "
+                      DOSTID"\n", (unsigned)lgi->lgi_off,
+                      loghandle->lgh_max_size,
+                      (int)loghandle->lgh_last_idx,
+                      POSTID(&loghandle->lgh_id.lgl_oi));
+               /* this is to signal that this llog is full */
+               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+               RETURN(-ENOSPC);
+       }
+
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
        if (left != 0 && left != reclen &&
@@ -559,6 +579,10 @@ static int llog_osd_write_rec(const struct lu_env *env,
                rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
                if (rc)
                        RETURN(rc);
+
+               if (dt_object_remote(o))
+                       loghandle->lgh_write_offset = lgi->lgi_off;
+
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC
@@ -660,6 +684,9 @@ out_unlock:
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
+       } else if (dt_object_remote(o)) {
+               lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
+                                    lgi->lgi_off);
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
                if (rc)
@@ -676,8 +703,11 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
-       CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
-              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+       if (dt_object_remote(o))
+               loghandle->lgh_write_offset = lgi->lgi_off;
+
+       CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n",
+              PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
        if (reccookie != NULL) {
                reccookie->lgc_lgl = loghandle->lgh_id;
@@ -700,11 +730,15 @@ out:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
-       if (--loghandle->lgh_last_idx == 0 &&
+       if (dt_object_remote(o)) {
+               loghandle->lgh_last_idx = orig_last_idx;
+               loghandle->lgh_write_offset = orig_write_offset;
+       } else if (--loghandle->lgh_last_idx == 0 &&
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
                loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
        }
+
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
@@ -821,9 +855,6 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
-       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
-              next_idx, *cur_idx, *cur_offset);
-
        LASSERT(loghandle);
        LASSERT(loghandle->lgh_ctxt);
 
@@ -837,6 +868,10 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (rc)
                GOTO(out, rc);
 
+       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off"
+              "%llu), size %llu\n", next_idx, *cur_idx,
+              *cur_offset, lgi->lgi_attr.la_size);
+
        while (*cur_offset < lgi->lgi_attr.la_size) {
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
@@ -855,7 +890,7 @@ static int llog_osd_next_block(const struct lu_env *env,
                                goto retry;
 
                        CERROR("%s: can't read llog block from log "DFID
-                              " offset "LPU64": rc = %d\n",
+                              " offset %llu: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               PFID(lu_object_fid(&o->do_lu)), *cur_offset,
                               rc);
@@ -879,7 +914,7 @@ static int llog_osd_next_block(const struct lu_env *env,
                                goto retry;
 
                        CERROR("%s: invalid llog block at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
@@ -898,14 +933,23 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
-               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+               if (last_rec->lrh_index != tail->lrt_index) {
+                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
+                              "offset %llu last_rec idx %u tail idx %u\n",
+                              o->do_lu.lo_dev->ld_obd->obd_name,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen, *cur_offset,
+                              last_rec->lrh_index, tail->lrt_index);
+                       GOTO(out, rc = -EINVAL);
+               }
 
                *cur_idx = tail->lrt_index;
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
                        CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
-                              "offset "LPU64" bytes %d\n",
+                              "offset %llu bytes %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, *cur_offset, rc);
@@ -1013,7 +1057,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
                rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
                if (rc < 0) {
                        CERROR("%s: can't read llog block from log "DFID
-                              " offset "LPU64": rc = %d\n",
+                              " offset %llu: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
                        GOTO(out, rc);
@@ -1024,7 +1068,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
                if (rc < sizeof(*tail)) {
                        CERROR("%s: invalid llog block at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
@@ -1048,7 +1092,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
                        CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
@@ -1147,6 +1191,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        struct ls_device                *ls;
        struct local_oid_storage        *los = NULL;
        int                              rc = 0;
+       bool new_id = false;
 
        ENTRY;
 
@@ -1207,6 +1252,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
                        /* generate fid for new llog */
                        rc = local_object_fid_generate(env, los,
                                                       &lgi->lgi_fid);
+                       new_id = true;
                }
                if (rc < 0)
                        GOTO(out, rc);
@@ -1218,15 +1264,30 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        } else {
                LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param);
                /* generate fid for new llog */
+generate:
                rc = local_object_fid_generate(env, los, &lgi->lgi_fid);
                if (rc < 0)
                        GOTO(out, rc);
+               new_id = true;
        }
 
        o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
        if (IS_ERR(o))
                GOTO(out_name, rc = PTR_ERR(o));
 
+       if (dt_object_exists(o) && new_id) {
+               /* llog exists with just generated ID, e.g. some old llog file
+                * still is in use or is orphan, drop a warn and skip it. */
+               CDEBUG(D_INFO, "%s: llog exists with the same FID: "DFID
+                      ", skipping\n",
+                      o->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&o->do_lu)));
+               lu_object_put(env, &o->do_lu);
+               /* just skip this llog ID, we shouldn't delete it because we
+                * don't know exactly what is its purpose and state. */
+               goto generate;
+       }
+
 after_open:
        /* No new llog is expected but doesn't exist */
        if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))