Whamcloud - gitweb
LU-9355 obd: remove obsolete OBD_FL_LOCAL_MASK
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index 0503e66..9d458f4 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -128,6 +128,12 @@ static int llog_osd_exist(struct llog_handle *handle)
                !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
 }
 
+static void *rec_tail(struct llog_rec_hdr *rec)
+{
+       return (void *)((char *)rec + rec->lrh_len -
+                       sizeof(struct llog_rec_tail));
+}
+
 /**
  * Write a padding record to the llog
  *
@@ -382,15 +388,11 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct dt_object        *o;
        __u32                   chunk_size;
        size_t                   left;
-
+       __u32                   orig_last_idx;
        ENTRY;
 
-       LASSERT(env);
        llh = loghandle->lgh_hdr;
-       LASSERT(llh);
        o = loghandle->lgh_obj;
-       LASSERT(o);
-       LASSERT(th);
 
        chunk_size = llh->llh_hdr.lrh_len;
        CDEBUG(D_OTHER, "new record %x to "DFID"\n",
@@ -501,9 +503,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        }
 
                        lgi->lgi_off = loghandle->lgh_cur_offset;
-                       CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, "
+                       CDEBUG(D_OTHER, "modify record "DFID": idx:%u, "
                               "len:%u offset %llu\n",
-                              POSTID(&loghandle->lgh_id.lgl_oi), idx,
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid), idx,
                               rec->lrh_len, (long long)lgi->lgi_off);
                } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                        lgi->lgi_off = llh->llh_hdr.lrh_len +
@@ -551,7 +553,20 @@ static int llog_osd_write_rec(const struct lu_env *env,
                RETURN(-ENOSPC);
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+       orig_last_idx = loghandle->lgh_last_idx;
        lgi->lgi_off = lgi->lgi_attr.la_size;
+
+       if (loghandle->lgh_max_size > 0 &&
+           lgi->lgi_off >= loghandle->lgh_max_size) {
+               CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u "
+                      DFID"\n", (unsigned)lgi->lgi_off,
+                      loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
+                      PFID(&loghandle->lgh_id.lgl_oi.oi_fid));
+               /* this is to signal that this llog is full */
+               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+               RETURN(-ENOSPC);
+       }
+
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
        if (left != 0 && left != reclen &&
@@ -560,6 +575,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
                if (rc)
                        RETURN(rc);
+
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC
@@ -677,8 +693,8 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
-       CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
-              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+       CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
+              PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
        if (reccookie != NULL) {
                reccookie->lgc_lgl = loghandle->lgh_id;
@@ -701,11 +717,14 @@ out:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
-       if (--loghandle->lgh_last_idx == 0 &&
+       if (dt_object_remote(o)) {
+               loghandle->lgh_last_idx = orig_last_idx;
+       } else if (--loghandle->lgh_last_idx == 0 &&
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
                loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
        }
+
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
@@ -822,9 +841,6 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
-       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
-              next_idx, *cur_idx, *cur_offset);
-
        LASSERT(loghandle);
        LASSERT(loghandle->lgh_ctxt);
 
@@ -838,6 +854,10 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (rc)
                GOTO(out, rc);
 
+       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off"
+              "%llu), size %llu\n", next_idx, *cur_idx,
+              *cur_offset, lgi->lgi_attr.la_size);
+
        while (*cur_offset < lgi->lgi_attr.la_size) {
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
@@ -856,7 +876,7 @@ static int llog_osd_next_block(const struct lu_env *env,
                                goto retry;
 
                        CERROR("%s: can't read llog block from log "DFID
-                              " offset "LPU64": rc = %d\n",
+                              " offset %llu: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               PFID(lu_object_fid(&o->do_lu)), *cur_offset,
                               rc);
@@ -879,10 +899,10 @@ static int llog_osd_next_block(const struct lu_env *env,
                        if (!force_mini_rec)
                                goto retry;
 
-                       CERROR("%s: invalid llog block at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                       CERROR("%s: invalid llog block at log id "DFID":%x "
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -899,16 +919,27 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
-               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+               if (last_rec->lrh_index != tail->lrt_index) {
+                       CERROR("%s: invalid llog tail at log id "DFID":%x "
+                              "offset %llu last_rec idx %u tail idx %u"
+                              "lrt len %u read_size %d\n",
+                              o->do_lu.lo_dev->ld_obd->obd_name,
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+                              loghandle->lgh_id.lgl_ogen, *cur_offset,
+                              last_rec->lrh_index, tail->lrt_index,
+                              tail->lrt_len, rc);
+                       GOTO(out, rc = -EINVAL);
+               }
 
                *cur_idx = tail->lrt_index;
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
-                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
-                              "offset "LPU64" bytes %d\n",
+                       CERROR("%s: invalid llog tail at log id "DFID":%x "
+                              "offset %llu bytes %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, *cur_offset, rc);
                        GOTO(out, rc = -EINVAL);
                }
@@ -1014,7 +1045,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
                rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
                if (rc < 0) {
                        CERROR("%s: can't read llog block from log "DFID
-                              " offset "LPU64": rc = %d\n",
+                              " offset %llu: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
                        GOTO(out, rc);
@@ -1024,10 +1055,10 @@ static int llog_osd_prev_block(const struct lu_env *env,
                        GOTO(out, rc);
 
                if (rc < sizeof(*tail)) {
-                       CERROR("%s: invalid llog block at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                       CERROR("%s: invalid llog block at log id "DFID":%x "
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -1048,10 +1079,10 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
-                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
-                              "offset "LPU64"\n",
+                       CERROR("%s: invalid llog tail at log id "DFID":%x "
+                              "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -1105,7 +1136,7 @@ static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
                dir = dt_locate(env, dt, &dti->dti_fid);
 
                if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) {
-                       lu_object_put(env, &dir->do_lu);
+                       dt_object_put(env, dir);
                        return ERR_PTR(-ENOTDIR);
                }
        } else {
@@ -1148,6 +1179,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        struct ls_device                *ls;
        struct local_oid_storage        *los = NULL;
        int                              rc = 0;
+       bool new_id = false;
 
        ENTRY;
 
@@ -1203,11 +1235,12 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
                dt_read_lock(env, llog_dir, 0);
                rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid);
                dt_read_unlock(env, llog_dir);
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
                if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
                        /* generate fid for new llog */
                        rc = local_object_fid_generate(env, los,
                                                       &lgi->lgi_fid);
+                       new_id = true;
                }
                if (rc < 0)
                        GOTO(out, rc);
@@ -1219,15 +1252,30 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        } else {
                LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param);
                /* generate fid for new llog */
+generate:
                rc = local_object_fid_generate(env, los, &lgi->lgi_fid);
                if (rc < 0)
                        GOTO(out, rc);
+               new_id = true;
        }
 
        o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
        if (IS_ERR(o))
                GOTO(out_name, rc = PTR_ERR(o));
 
+       if (dt_object_exists(o) && new_id) {
+               /* llog exists with just generated ID, e.g. some old llog file
+                * still is in use or is orphan, drop a warn and skip it. */
+               CDEBUG(D_INFO, "%s: llog exists with the same FID: "DFID
+                      ", skipping\n",
+                      o->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&o->do_lu)));
+               dt_object_put(env, o);
+               /* just skip this llog ID, we shouldn't delete it because we
+                * don't know exactly what is its purpose and state. */
+               goto generate;
+       }
+
 after_open:
        /* No new llog is expected but doesn't exist */
        if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
@@ -1241,7 +1289,7 @@ after_open:
        RETURN(rc);
 
 out_put:
-       lu_object_put(env, &o->do_lu);
+       dt_object_put(env, o);
 out_name:
        if (handle->lgh_name != NULL)
                OBD_FREE(handle->lgh_name, strlen(name) + 1);
@@ -1288,7 +1336,7 @@ struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env,
                RETURN(dir);
 
        if (!dt_try_as_dir(env, dir)) {
-               lu_object_put(env, &dir->do_lu);
+               dt_object_put(env, dir);
                RETURN(ERR_PTR(-ENOTDIR));
        }
 
@@ -1343,7 +1391,7 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
        }
        dt_write_unlock(env, dir);
 
-       lu_object_put(env, &dir->do_lu);
+       dt_object_put(env, dir);
        RETURN(rc);
 }
 
@@ -1420,7 +1468,7 @@ static int llog_osd_declare_create(const struct lu_env *env,
                rc = dt_declare_insert(env, llog_dir,
                                       (struct dt_rec *)rec,
                                       (struct dt_key *)res->lgh_name, th);
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
                if (rc)
                        CERROR("%s: can't declare named llog %s: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -1509,7 +1557,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                               (struct dt_key *)res->lgh_name,
                               th, 1);
                dt_read_unlock(env, llog_dir);
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
                if (rc)
                        CERROR("%s: can't create named llog %s: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -1542,11 +1590,11 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
        if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
                /* Remove the object from the cache, otherwise it may
                 * hold LOD being released during cleanup process */
-               lu_object_put_nocache(env, &handle->lgh_obj->do_lu);
+               dt_object_put_nocache(env, handle->lgh_obj);
                LASSERT(handle->private_data == NULL);
                RETURN(rc);
        } else {
-               lu_object_put(env, &handle->lgh_obj->do_lu);
+               dt_object_put(env, handle->lgh_obj);
        }
        los = handle->private_data;
        LASSERT(los);
@@ -1601,7 +1649,7 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
        }
        dt_write_unlock(env, dir);
 
-       lu_object_put(env, &dir->do_lu);
+       dt_object_put(env, dir);
        RETURN(rc);
 }
 
@@ -1661,7 +1709,7 @@ static int llog_osd_declare_destroy(const struct lu_env *env,
 
 out_put:
        if (!(IS_ERR_OR_NULL(llog_dir)))
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
 
        RETURN(rc);
 }
@@ -1732,7 +1780,7 @@ static int llog_osd_destroy(const struct lu_env *env,
 out_unlock:
        dt_write_unlock(env, o);
        if (!(IS_ERR_OR_NULL(llog_dir)))
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
        RETURN(rc);
 }
 
@@ -1978,7 +2026,7 @@ out_trans:
 
        EXIT;
 out:
-       lu_object_put(env, &o->do_lu);
+       dt_object_put(env, o);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_osd_get_cat_list);
@@ -2068,7 +2116,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
 out_trans:
        dt_trans_stop(env, d, th);
 out:
-       lu_object_put(env, &o->do_lu);
+       dt_object_put(env, o);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_osd_put_cat_list);