Whamcloud - gitweb
LU-8066 obd: migrate to ksets
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index 0523075..cba9a68 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -128,6 +128,12 @@ static int llog_osd_exist(struct llog_handle *handle)
                !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
 }
 
+static void *rec_tail(struct llog_rec_hdr *rec)
+{
+       return (void *)((char *)rec + rec->lrh_len -
+                       sizeof(struct llog_rec_tail));
+}
+
 /**
  * Write a padding record to the llog
  *
@@ -280,7 +286,6 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
-       handle->lgh_write_offset = lgi->lgi_attr.la_size;
 
        RETURN(0);
 }
@@ -384,7 +389,6 @@ static int llog_osd_write_rec(const struct lu_env *env,
        __u32                   chunk_size;
        size_t                   left;
        __u32                   orig_last_idx;
-       __u64                   orig_write_offset;
        ENTRY;
 
        llh = loghandle->lgh_hdr;
@@ -499,9 +503,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        }
 
                        lgi->lgi_off = loghandle->lgh_cur_offset;
-                       CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, "
+                       CDEBUG(D_OTHER, "modify record "DFID": idx:%u, "
                               "len:%u offset %llu\n",
-                              POSTID(&loghandle->lgh_id.lgl_oi), idx,
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid), idx,
                               rec->lrh_len, (long long)lgi->lgi_off);
                } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                        lgi->lgi_off = llh->llh_hdr.lrh_len +
@@ -550,8 +554,19 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
        orig_last_idx = loghandle->lgh_last_idx;
-       orig_write_offset = loghandle->lgh_write_offset;
        lgi->lgi_off = lgi->lgi_attr.la_size;
+
+       if (loghandle->lgh_max_size > 0 &&
+           lgi->lgi_off >= loghandle->lgh_max_size) {
+               CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u "
+                      DFID"\n", (unsigned)lgi->lgi_off,
+                      loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
+                      PFID(&loghandle->lgh_id.lgl_oi.oi_fid));
+               /* this is to signal that this llog is full */
+               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+               RETURN(-ENOSPC);
+       }
+
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
        if (left != 0 && left != reclen &&
@@ -561,9 +576,6 @@ static int llog_osd_write_rec(const struct lu_env *env,
                if (rc)
                        RETURN(rc);
 
-               if (dt_object_remote(o))
-                       loghandle->lgh_write_offset = lgi->lgi_off;
-
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC
@@ -665,9 +677,6 @@ out_unlock:
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
-       } else if (dt_object_remote(o)) {
-               lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
-                                    lgi->lgi_off);
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
                if (rc)
@@ -684,10 +693,7 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
-       if (dt_object_remote(o))
-               loghandle->lgh_write_offset = lgi->lgi_off;
-
-       CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n",
+       CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
               PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
        if (reccookie != NULL) {
@@ -713,7 +719,6 @@ out:
        /* restore llog last_idx */
        if (dt_object_remote(o)) {
                loghandle->lgh_last_idx = orig_last_idx;
-               loghandle->lgh_write_offset = orig_write_offset;
        } else if (--loghandle->lgh_last_idx == 0 &&
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
@@ -894,10 +899,10 @@ static int llog_osd_next_block(const struct lu_env *env,
                        if (!force_mini_rec)
                                goto retry;
 
-                       CERROR("%s: invalid llog block at log id "DOSTID"/%u "
+                       CERROR("%s: invalid llog block at log id "DFID":%x "
                               "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -916,12 +921,14 @@ static int llog_osd_next_block(const struct lu_env *env,
                        lustre_swab_llog_rec(last_rec);
 
                if (last_rec->lrh_index != tail->lrt_index) {
-                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
-                              "offset %llu last_rec idx %u tail idx %u\n",
+                       CERROR("%s: invalid llog tail at log id "DFID":%x "
+                              "offset %llu last_rec idx %u tail idx %u"
+                              "lrt len %u read_size %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, *cur_offset,
-                              last_rec->lrh_index, tail->lrt_index);
+                              last_rec->lrh_index, tail->lrt_index,
+                              tail->lrt_len, rc);
                        GOTO(out, rc = -EINVAL);
                }
 
@@ -929,10 +936,10 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
-                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
+                       CERROR("%s: invalid llog tail at log id "DFID":%x "
                               "offset %llu bytes %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, *cur_offset, rc);
                        GOTO(out, rc = -EINVAL);
                }
@@ -1048,10 +1055,10 @@ static int llog_osd_prev_block(const struct lu_env *env,
                        GOTO(out, rc);
 
                if (rc < sizeof(*tail)) {
-                       CERROR("%s: invalid llog block at log id "DOSTID"/%u "
+                       CERROR("%s: invalid llog block at log id "DFID":%x "
                               "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -1072,10 +1079,10 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
-                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
+                       CERROR("%s: invalid llog tail at log id "DFID":%x "
                               "offset %llu\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -1129,7 +1136,7 @@ static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
                dir = dt_locate(env, dt, &dti->dti_fid);
 
                if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) {
-                       lu_object_put(env, &dir->do_lu);
+                       dt_object_put(env, dir);
                        return ERR_PTR(-ENOTDIR);
                }
        } else {
@@ -1228,7 +1235,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
                dt_read_lock(env, llog_dir, 0);
                rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid);
                dt_read_unlock(env, llog_dir);
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
                if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
                        /* generate fid for new llog */
                        rc = local_object_fid_generate(env, los,
@@ -1263,7 +1270,7 @@ generate:
                       ", skipping\n",
                       o->do_lu.lo_dev->ld_obd->obd_name,
                       PFID(lu_object_fid(&o->do_lu)));
-               lu_object_put(env, &o->do_lu);
+               dt_object_put(env, o);
                /* just skip this llog ID, we shouldn't delete it because we
                 * don't know exactly what is its purpose and state. */
                goto generate;
@@ -1271,9 +1278,12 @@ generate:
 
 after_open:
        /* No new llog is expected but doesn't exist */
-       if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
+       if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) {
+               CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n",
+                      o->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&o->do_lu)), o);
                GOTO(out_put, rc = -ENOENT);
-
+       }
        fid_to_logid(&lgi->lgi_fid, &handle->lgh_id);
        handle->lgh_obj = o;
        handle->private_data = los;
@@ -1282,7 +1292,7 @@ after_open:
        RETURN(rc);
 
 out_put:
-       lu_object_put(env, &o->do_lu);
+       dt_object_put(env, o);
 out_name:
        if (handle->lgh_name != NULL)
                OBD_FREE(handle->lgh_name, strlen(name) + 1);
@@ -1329,7 +1339,7 @@ struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env,
                RETURN(dir);
 
        if (!dt_try_as_dir(env, dir)) {
-               lu_object_put(env, &dir->do_lu);
+               dt_object_put(env, dir);
                RETURN(ERR_PTR(-ENOTDIR));
        }
 
@@ -1384,7 +1394,7 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
        }
        dt_write_unlock(env, dir);
 
-       lu_object_put(env, &dir->do_lu);
+       dt_object_put(env, dir);
        RETURN(rc);
 }
 
@@ -1461,7 +1471,7 @@ static int llog_osd_declare_create(const struct lu_env *env,
                rc = dt_declare_insert(env, llog_dir,
                                       (struct dt_rec *)rec,
                                       (struct dt_key *)res->lgh_name, th);
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
                if (rc)
                        CERROR("%s: can't declare named llog %s: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -1550,7 +1560,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                               (struct dt_key *)res->lgh_name,
                               th, 1);
                dt_read_unlock(env, llog_dir);
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
                if (rc)
                        CERROR("%s: can't create named llog %s: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -1583,11 +1593,11 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
        if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
                /* Remove the object from the cache, otherwise it may
                 * hold LOD being released during cleanup process */
-               lu_object_put_nocache(env, &handle->lgh_obj->do_lu);
+               dt_object_put_nocache(env, handle->lgh_obj);
                LASSERT(handle->private_data == NULL);
                RETURN(rc);
        } else {
-               lu_object_put(env, &handle->lgh_obj->do_lu);
+               dt_object_put(env, handle->lgh_obj);
        }
        los = handle->private_data;
        LASSERT(los);
@@ -1642,7 +1652,7 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
        }
        dt_write_unlock(env, dir);
 
-       lu_object_put(env, &dir->do_lu);
+       dt_object_put(env, dir);
        RETURN(rc);
 }
 
@@ -1702,7 +1712,7 @@ static int llog_osd_declare_destroy(const struct lu_env *env,
 
 out_put:
        if (!(IS_ERR_OR_NULL(llog_dir)))
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
 
        RETURN(rc);
 }
@@ -1773,7 +1783,7 @@ static int llog_osd_destroy(const struct lu_env *env,
 out_unlock:
        dt_write_unlock(env, o);
        if (!(IS_ERR_OR_NULL(llog_dir)))
-               lu_object_put(env, &llog_dir->do_lu);
+               dt_object_put(env, llog_dir);
        RETURN(rc);
 }
 
@@ -2019,7 +2029,7 @@ out_trans:
 
        EXIT;
 out:
-       lu_object_put(env, &o->do_lu);
+       dt_object_put(env, o);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_osd_get_cat_list);
@@ -2109,7 +2119,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
 out_trans:
        dt_trans_stop(env, d, th);
 out:
-       lu_object_put(env, &o->do_lu);
+       dt_object_put(env, o);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_osd_put_cat_list);