Whamcloud - gitweb
LU-14688 mdt: changelog purge deletes plain llog
[fs/lustre-release.git] / lustre / mdd / mdd_device.c
index ac5ee30..e7923f1 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/mdd/mdd_device.c
  *
@@ -145,7 +144,7 @@ static int mdd_init0(const struct lu_env *env, struct mdd_device *mdd,
        /* sync permission changes */
        mdd->mdd_sync_permission = 1;
        /* enable changelog garbage collection */
-       mdd->mdd_changelog_gc = 0;
+       mdd->mdd_changelog_gc = 1;
        /* with a significant amount of idle time */
        mdd->mdd_changelog_max_idle_time = CHLOG_MAX_IDLE_TIME;
        /* or a significant amount of late indexes */
@@ -154,6 +153,9 @@ static int mdd_init0(const struct lu_env *env, struct mdd_device *mdd,
        mdd->mdd_changelog_min_gc_interval = CHLOG_MIN_GC_INTERVAL;
        /* with a very few number of free catalog entries */
        mdd->mdd_changelog_min_free_cat_entries = CHLOG_MIN_FREE_CAT_ENTRIES;
+       /* special default striping for files created with O_APPEND */
+       mdd->mdd_append_stripe_count = 1;
+       mdd->mdd_append_pool[0] = '\0';
 
        dt_conf_get(env, mdd->mdd_child, &mdd->mdd_dt_conf);
 
@@ -304,10 +306,8 @@ static int llog_changelog_cancel_cb(const struct lu_env *env,
                                    struct llog_rec_hdr *hdr, void *data)
 {
        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
-       struct llog_cookie       cookie;
        struct changelog_cancel_cookie *cl_cookie =
                (struct changelog_cancel_cookie *)data;
-       int                      rc;
 
        ENTRY;
 
@@ -337,15 +337,40 @@ static int llog_changelog_cancel_cb(const struct lu_env *env,
                /* records are in order, so we're done */
                RETURN(LLOG_PROC_BREAK);
 
-       cookie.lgc_lgl = llh->lgh_id;
-       cookie.lgc_index = hdr->lrh_index;
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_CHANGELOG_RACE))) {
+               if (cfs_fail_val == 0)
+                       cfs_fail_val = hdr->lrh_index;
+               if (cfs_fail_val == hdr->lrh_index)
+                       OBD_RACE(OBD_FAIL_MDS_CHANGELOG_RACE);
+       }
+
+       /* Records folow one by one, cr_index++. We could calculate the
+        * last cr_index at this plain llog. And if it less then cookie endrec
+        * cancel the whole file.
+        */
+       if ((LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - hdr->lrh_index +
+            rec->cr.cr_index) < cl_cookie->endrec) {
+               int rc;
+
+               if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_CHANGELOG_DEL))) {
+                       if (cfs_fail_val == 0) {
+                               cfs_fail_val = (unsigned long)llh & 0xFFFFFFFF;
+                               OBD_RACE(OBD_FAIL_MDS_CHANGELOG_DEL);
+                       }
+               }
+               rc = llog_destroy(env, llh);
+               if (!rc) {
+                       CDEBUG(D_HA, "Changelog destroyed plain "DFID"\n",
+                              PFID(&llh->lgh_id.lgl_oi.oi_fid));
+                       RETURN(LLOG_DEL_PLAIN);
+               }
+       }
 
        /* cancel them one at a time.  I suppose we could store up the cookies
         * and cancel them all at once; probably more efficient, but this is
         * done as a user call, so who cares... */
-       rc = llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1,
-                                    &cookie);
-       RETURN(rc < 0 ? rc : 0);
+
+       RETURN(LLOG_DEL_RECORD);
 }
 
 static int llog_changelog_cancel(const struct lu_env *env,
@@ -373,6 +398,8 @@ static int llog_changelog_cancel(const struct lu_env *env,
        RETURN(rc);
 }
 
+static struct llog_operations changelog_orig_logops;
+
 static int
 mdd_changelog_write_header(const struct lu_env *env, struct mdd_device *mdd,
                           int markerflags);
@@ -437,7 +464,7 @@ static int mdd_changelog_llog_init(const struct lu_env *env,
        OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
        obd->obd_lvfs_ctxt.dt = mdd->mdd_bottom;
        rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CHANGELOG_ORIG_CTXT,
-                       obd, &llog_common_cat_ops);
+                       obd, &changelog_orig_logops);
        if (rc) {
                CERROR("%s: changelog llog setup failed: rc = %d\n",
                       obd->obd_name, rc);
@@ -470,7 +497,7 @@ static int mdd_changelog_llog_init(const struct lu_env *env,
 
        /* setup user changelog */
        rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CHANGELOG_USER_ORIG_CTXT,
-                       obd, &llog_common_cat_ops);
+                       obd, &changelog_orig_logops);
        if (rc) {
                CERROR("%s: changelog users llog setup failed: rc = %d\n",
                       obd->obd_name, rc);
@@ -593,7 +620,9 @@ static void mdd_changelog_fini(const struct lu_env *env,
        struct obd_device       *obd = mdd2obd_dev(mdd);
        struct llog_ctxt        *ctxt;
 
-       mdd->mdd_cl.mc_flags = 0;
+       if (mdd->mdd_cl.mc_flags & CLM_CLEANUP_DONE)
+               return;
+       mdd->mdd_cl.mc_flags = CLM_CLEANUP_DONE;
 
 again:
        /* stop GC-thread if running */
@@ -611,7 +640,8 @@ again:
                         * and to have set mc_gc_task to itself
                         */
                        spin_unlock(&mdd->mdd_cl.mc_lock);
-                       schedule_timeout(usecs_to_jiffies(10));
+                       /* Add a tiny sleep */
+                       schedule_timeout_uninterruptible(1);
                        /* go back to fully check if GC-thread has started or
                         * even already exited or if a new one is starting...
                         */
@@ -711,13 +741,13 @@ int mdd_changelog_write_header(const struct lu_env *env,
 
        ENTRY;
 
-       if (mdd->mdd_cl.mc_mask & (1 << CL_MARK)) {
+       if (mdd->mdd_cl.mc_mask & BIT(CL_MARK)) {
                mdd->mdd_cl.mc_starttime = ktime_get();
                RETURN(0);
        }
 
        reclen = llog_data_len(sizeof(*rec) + len);
-       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
+       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_chlg_buf, reclen);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
@@ -732,9 +762,6 @@ int mdd_changelog_write_header(const struct lu_env *env,
                                            rec->cr.cr_namelen);
        rec->cr_hdr.lrh_type = CHANGELOG_REC;
        rec->cr.cr_time = cl_time();
-       spin_lock(&mdd->mdd_cl.mc_lock);
-       rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
-       spin_unlock(&mdd->mdd_cl.mc_lock);
 
        ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
        LASSERT(ctxt);
@@ -766,23 +793,12 @@ static int obf_lookup(const struct lu_env *env, struct md_object *p,
                 name++;
 
         sscanf(name, SFID, RFID(f));
-        if (!fid_is_sane(f)) {
-               CWARN("%s: Trying to lookup invalid FID [%s] in %s/%s, FID "
-                     "format should be "DFID"\n", mdd2obd_dev(mdd)->obd_name,
-                     lname->ln_name, dot_lustre_name, mdd_obf_dir_name,
-                     (__u64)FID_SEQ_NORMAL, 1, 0);
-                GOTO(out, rc = -EINVAL);
-        }
+       if (!fid_is_sane(f))
+               GOTO(out, rc = -ENOENT);
 
        if (!fid_is_norm(f) && !fid_is_igif(f) && !fid_is_root(f) &&
-           !fid_seq_is_dot(f->f_seq)) {
-               CWARN("%s: Trying to lookup invalid FID "DFID" in %s/%s, "
-                     "sequence should be >= %#llx or within [%#llx,"
-                     "%#llx].\n", mdd2obd_dev(mdd)->obd_name, PFID(f),
-                     dot_lustre_name, mdd_obf_dir_name, (__u64)FID_SEQ_NORMAL,
-                     (__u64)FID_SEQ_IGIF, (__u64)FID_SEQ_IGIF_MAX);
-               GOTO(out, rc = -EINVAL);
-       }
+           !fid_seq_is_dot(f->f_seq))
+               GOTO(out, rc = -ENOENT);
 
         /* Check if object with this fid exists */
         child = mdd_object_find(env, mdd, f);
@@ -839,15 +855,27 @@ static int mdd_dummy_unlink(const struct lu_env *env,
        return -EPERM;
 }
 
-static struct md_dir_operations mdd_obf_dir_ops = {
+int mdd_create(const struct lu_env *env, struct md_object *pobj,
+                     const struct lu_name *lname, struct md_object *child,
+                     struct md_op_spec *spec, struct md_attr *ma);
+static int mdd_obf_create(const struct lu_env *env, struct md_object *pobj,
+                     const struct lu_name *lname, struct md_object *child,
+                     struct md_op_spec *spec, struct md_attr *ma)
+{
+       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
+               return mdd_create(env, pobj, lname, child, spec, ma);
+       RETURN(-EPERM);
+}
+
+static const struct md_dir_operations mdd_obf_dir_ops = {
        .mdo_lookup = obf_lookup,
-       .mdo_create = mdd_dummy_create,
+       .mdo_create = mdd_obf_create,
        .mdo_rename = mdd_dummy_rename,
        .mdo_link   = mdd_dummy_link,
        .mdo_unlink = mdd_dummy_unlink
 };
 
-static struct md_dir_operations mdd_lpf_dir_ops = {
+static const struct md_dir_operations mdd_lpf_dir_ops = {
        .mdo_lookup = mdd_lookup,
        .mdo_create = mdd_dummy_create,
        .mdo_rename = mdd_dummy_rename,
@@ -868,7 +896,7 @@ static struct md_object *mdo_locate(const struct lu_env *env,
                LASSERT(obj != NULL);
                mdo = lu2md(obj);
        } else {
-               mdo = ERR_PTR(PTR_ERR(obj));
+               mdo = ERR_CAST(obj);
        }
        return mdo;
 }
@@ -1095,10 +1123,11 @@ static int mdd_process_config(const struct lu_env *env,
 
         switch (cfg->lcfg_command) {
        case LCFG_PARAM: {
-               struct obd_device *obd = mdd2obd_dev(m);
+               ssize_t count;
 
-               rc = class_process_proc_param(PARAM_MDD, obd->obd_vars, cfg, m);
-               if (rc > 0 || rc == -ENOSYS)
+               count = class_modify_config(cfg, PARAM_MDD, &m->mdd_kobj);
+               rc = count > 0 ? 0 : count;
+               if (rc)
                        /* we don't understand; pass it on */
                        rc = next->ld_ops->ldo_process_config(env, next, cfg);
                break;
@@ -1302,11 +1331,30 @@ out_los:
        return rc;
 }
 
+/**
+ * Implementation of lu_device_operations::ldo_fid_alloc() for MDD.
+ *
+ * Find corresponding device by passed parent and name, and allocate FID from
+ * there.
+ *
+ * see include/lu_object.h for the details.
+ */
+static int mdd_fid_alloc(const struct lu_env *env, struct lu_device *d,
+                        struct lu_fid *fid, struct lu_object *parent,
+                        const struct lu_name *name)
+{
+       struct mdd_device *mdd = lu2mdd_dev(d);
+       struct lu_object *o = lu_object_next(parent);
+
+       return dt_fid_alloc(env, mdd->mdd_child, fid, o, name);
+}
+
 const struct lu_device_operations mdd_lu_ops = {
         .ldo_object_alloc      = mdd_object_alloc,
         .ldo_process_config    = mdd_process_config,
         .ldo_recovery_complete = mdd_recovery_complete,
         .ldo_prepare           = mdd_prepare,
+       .ldo_fid_alloc         = mdd_fid_alloc,
 };
 
 static int mdd_root_get(const struct lu_env *env,
@@ -1330,7 +1378,7 @@ static int mdd_statfs(const struct lu_env *env, struct md_device *m,
 
        ENTRY;
 
-       rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs);
+       rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs, NULL);
 
        sfs->os_namelen = min_t(__u32, sfs->os_namelen, NAME_MAX);
 
@@ -1482,7 +1530,7 @@ static int mdd_obd_set_info_async(const struct lu_env *env,
        RETURN(rc);
 }
 
-static struct obd_ops mdd_obd_device_ops = {
+static const struct obd_ops mdd_obd_device_ops = {
        .o_owner        = THIS_MODULE,
        .o_connect      = mdd_obd_connect,
        .o_disconnect   = mdd_obd_disconnect,
@@ -1639,6 +1687,8 @@ int mdd_changelog_user_purge(const struct lu_env *env,
                              mdd_changelog_user_purge_cb, &mcup,
                              0, 0);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PURGE_DELAY, cfs_fail_val);
+
        if ((rc == 0) && (mcup.mcup_usercount == 0)) {
                spin_lock(&mdd->mdd_cl.mc_user_lock);
                if (mdd->mdd_cl.mc_users == 0) {
@@ -1789,7 +1839,11 @@ static int mdd_changelog_clear(const struct lu_env *env,
                              mdd_changelog_clear_cb, (void *)&mcuc,
                              0, 0);
 
-       if (rc < 0) {
+       if (rc == -EINVAL) {
+               CDEBUG(D_IOCTL, "%s: No changelog recnum <= %llu to clear\n",
+                      mdd2obd_dev(mdd)->obd_name, (unsigned long long) endrec);
+               RETURN(-EINVAL);
+       } else if (rc < 0) {
                CWARN("%s: Failure to clear the changelog for user %d: %d\n",
                      mdd2obd_dev(mdd)->obd_name, id, rc);
        } else if (mcuc.mcuc_flush) {
@@ -1804,7 +1858,7 @@ static int mdd_changelog_clear(const struct lu_env *env,
                                                      mcuc.mcuc_minrec);
                }
        } else {
-               CWARN("%s: No entry for user %d\n",
+               CDEBUG(D_IOCTL, "%s: No entry for user %d\n",
                      mdd2obd_dev(mdd)->obd_name, id);
                rc = -ENOENT;
        }
@@ -1886,7 +1940,15 @@ static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
                if (unlikely(!barrier_entry(mdd->mdd_bottom)))
                        RETURN(-EINPROGRESS);
 
-               rc = mdd_changelog_user_purge(env, mdd, data->ioc_u32_1);
+               /* explicitly clear changelog first, to protect from crash in
+                * the middle of purge that would lead to unregistered consumer
+                * but pending changelog entries
+                */
+               rc = mdd_changelog_clear(env, mdd, data->ioc_u32_1, 0);
+               if (!rc)
+                       rc = mdd_changelog_user_purge(env,
+                                                     mdd, data->ioc_u32_1);
+
                barrier_exit(mdd->mdd_bottom);
                break;
        default:
@@ -1907,17 +1969,17 @@ static const struct md_device_operations mdd_ops = {
        .mdo_dtconf_get     = mdd_dtconf_get,
 };
 
-static struct lu_device_type_operations mdd_device_type_ops = {
-        .ldto_init = mdd_type_init,
-        .ldto_fini = mdd_type_fini,
+static const struct lu_device_type_operations mdd_device_type_ops = {
+       .ldto_init              = mdd_type_init,
+       .ldto_fini              = mdd_type_fini,
 
-        .ldto_start = mdd_type_start,
-        .ldto_stop  = mdd_type_stop,
+       .ldto_start             = mdd_type_start,
+       .ldto_stop              = mdd_type_stop,
 
-        .ldto_device_alloc = mdd_device_alloc,
-        .ldto_device_free  = mdd_device_free,
+       .ldto_device_alloc      = mdd_device_alloc,
+       .ldto_device_free       = mdd_device_free,
 
-        .ldto_device_fini    = mdd_device_fini
+       .ldto_device_fini       = mdd_device_fini
 };
 
 static struct lu_device_type mdd_device_type = {
@@ -1938,6 +2000,7 @@ static void mdd_key_fini(const struct lu_context *ctx,
        lu_buf_free(&info->mti_big_buf);
        lu_buf_free(&info->mti_link_buf);
        lu_buf_free(&info->mti_xattr_buf);
+       lu_buf_free(&info->mti_chlg_buf);
 
        OBD_FREE_PTR(info);
 }
@@ -1982,7 +2045,10 @@ static int __init mdd_init(void)
        if (rc)
                return rc;
 
-       rc = class_register_type(&mdd_obd_device_ops, NULL, true, NULL,
+       changelog_orig_logops = llog_common_cat_ops;
+       changelog_orig_logops.lop_write_rec = mdd_changelog_write_rec;
+
+       rc = class_register_type(&mdd_obd_device_ops, NULL, false,
                                 LUSTRE_MDD_NAME, &mdd_device_type);
        if (rc)
                lu_kmem_fini(mdd_caches);