Whamcloud - gitweb
LU-7340 mdd: changelogs garbage collection
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 2b4ab39..a353f8b 100644 (file)
@@ -735,9 +735,8 @@ static int mdd_llog_record_calc_size(const struct lu_env *env,
                                     const struct lu_name *sname)
 {
        const struct lu_ucred   *uc = lu_ucred(env);
-       enum changelog_rec_flags crf = 0;
-       size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
-                                           sizeof(struct changelog_rec);
+       enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
+       enum changelog_rec_extra_flags crfe = CLFE_UIDGID;
 
        if (sname != NULL)
                crf |= CLF_RENAME;
@@ -745,7 +744,8 @@ static int mdd_llog_record_calc_size(const struct lu_env *env,
        if (uc != NULL && uc->uc_jobid[0] != '\0')
                crf |= CLF_JOBID;
 
-       return llog_data_len(hdr_size + changelog_rec_offset(crf) +
+       return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
+                            changelog_rec_offset(crf, crfe) +
                             (tname != NULL ? tname->ln_namelen : 0) +
                             (sname != NULL ? 1 + sname->ln_namelen : 0));
 }
@@ -793,6 +793,136 @@ out_put:
        return rc;
 }
 
+struct mdd_changelog_gc {
+       struct mdd_device *mcgc_mdd;
+       bool mcgc_found;
+       __u32 mcgc_maxtime;
+       __u64 mcgc_maxindexes;
+       __u32 mcgc_id;
+};
+
+/* return first registered ChangeLog user idle since too long
+ * use ChangeLog's user plain LLOG mtime for this */
+static int mdd_changelog_gc_cb(const struct lu_env *env,
+                              struct llog_handle *llh,
+                              struct llog_rec_hdr *hdr, void *data)
+{
+       struct llog_changelog_user_rec  *rec;
+       struct mdd_changelog_gc *mcgc = (struct mdd_changelog_gc *)data;
+       struct mdd_device *mdd = mcgc->mcgc_mdd;
+       ENTRY;
+
+       if ((llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) == 0)
+               RETURN(-ENXIO);
+
+       rec = container_of(hdr, struct llog_changelog_user_rec,
+                          cur_hdr);
+
+       /* find oldest idle user, based on last record update/cancel time (new
+        * behavior), or for old user records, last record index vs current
+        * ChangeLog index. Late users with old record format will be treated
+        * first as we assume they could be idle since longer
+        */
+       if (rec->cur_time != 0) {
+               __u32 time_now = (__u32)get_seconds();
+               __u32 time_out = rec->cur_time +
+                                mdd->mdd_changelog_max_idle_time;
+               __u32 idle_time = time_now - rec->cur_time;
+
+               /* treat oldest idle user first, and if no old format user
+                * has been already selected
+                */
+               if (time_after32(time_now, time_out) &&
+                   idle_time > mcgc->mcgc_maxtime &&
+                   mcgc->mcgc_maxindexes == 0) {
+                       mcgc->mcgc_maxtime = idle_time;
+                       mcgc->mcgc_id = rec->cur_id;
+                       mcgc->mcgc_found = true;
+               }
+       } else {
+               /* old user record with no idle time stamp, so use empirical
+                * method based on its current index/position
+                */
+               __u64 idle_indexes;
+
+               idle_indexes = mdd->mdd_cl.mc_index - rec->cur_endrec;
+
+               /* treat user with the oldest/smallest current index first */
+               if (idle_indexes >= mdd->mdd_changelog_max_idle_indexes &&
+                   idle_indexes > mcgc->mcgc_maxindexes) {
+                       mcgc->mcgc_maxindexes = idle_indexes;
+                       mcgc->mcgc_id = rec->cur_id;
+                       mcgc->mcgc_found = true;
+               }
+
+       }
+       RETURN(0);
+}
+
+/* recover space from long-term inactive ChangeLog users */
+static int mdd_chlg_garbage_collect(void *data)
+{
+       struct mdd_device *mdd = (struct mdd_device *)data;
+       struct lu_env             *env = NULL;
+       int                        rc;
+       struct llog_ctxt *ctxt;
+       struct mdd_changelog_gc mcgc = {
+               .mcgc_mdd = mdd,
+               .mcgc_found = false,
+               .mcgc_maxtime = 0,
+               .mcgc_maxindexes = 0,
+       };
+       ENTRY;
+
+       CDEBUG(D_HA, "%s: ChangeLog garbage collect thread start\n",
+              mdd2obd_dev(mdd)->obd_name);
+
+       OBD_ALLOC_PTR(env);
+       if (env == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       rc = lu_env_init(env, LCT_MD_THREAD);
+       if (rc)
+               GOTO(out, rc);
+
+       for (;;) {
+               ctxt = llog_get_context(mdd2obd_dev(mdd),
+                                       LLOG_CHANGELOG_USER_ORIG_CTXT);
+               if (ctxt == NULL ||
+                   (ctxt->loc_handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) == 0)
+                       GOTO(out_env, rc = -ENXIO);
+
+               rc = llog_cat_process(env, ctxt->loc_handle,
+                                     mdd_changelog_gc_cb, &mcgc, 0, 0);
+               if (rc != 0 || mcgc.mcgc_found == false)
+                       break;
+               llog_ctxt_put(ctxt);
+
+               CWARN("%s: Force deregister of ChangeLog user cl%d idle more "
+                     "than %us\n", mdd2obd_dev(mdd)->obd_name, mcgc.mcgc_id,
+                     mcgc.mcgc_maxtime);
+
+               mdd_changelog_user_purge(env, mdd, mcgc.mcgc_id);
+
+               /* try again to search for another candidate */
+               mcgc.mcgc_found = false;
+               mcgc.mcgc_maxtime = 0;
+               mcgc.mcgc_maxindexes = 0;
+       }
+
+out_env:
+       if (ctxt != NULL)
+               llog_ctxt_put(ctxt);
+
+       lu_env_fini(env);
+       GOTO(out, rc);
+out:
+       if (env)
+               OBD_FREE_PTR(env);
+       mdd->mdd_cl.mc_gc_task = NULL;
+       return rc;
+}
+
 /** Add a changelog entry \a rec to the changelog llog
  * \param mdd
  * \param rec
@@ -807,6 +937,7 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        struct llog_ctxt        *ctxt;
        struct thandle          *llog_th;
        int                      rc;
+       bool                     run_gc_task;
 
        rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
                                            changelog_rec_varsize(&rec->cr));
@@ -833,6 +964,41 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        /* nested journal transaction */
        rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
 
+       /* time to recover some space ?? */
+       spin_lock(&mdd->mdd_cl.mc_lock);
+       if (unlikely(mdd->mdd_changelog_gc && (ktime_get_real_seconds() -
+           mdd->mdd_cl.mc_gc_time > mdd->mdd_changelog_min_gc_interval) &&
+           mdd->mdd_cl.mc_gc_task == NULL &&
+           llog_cat_free_space(ctxt->loc_handle) <=
+                               mdd->mdd_changelog_min_free_cat_entries)) {
+               CWARN("%s: low on changelog_catalog free entries, starting "
+                     "ChangeLog garbage collection thread\n", obd->obd_name);
+
+               /* indicate further kthread run will occur outside right after
+                * critical section
+                */
+               mdd->mdd_cl.mc_gc_task = (struct task_struct *)(-1);
+               run_gc_task = true;
+       }
+       spin_unlock(&mdd->mdd_cl.mc_lock);
+       if (run_gc_task) {
+               struct task_struct *gc_task;
+
+               gc_task = kthread_run(mdd_chlg_garbage_collect, mdd,
+                                     "chlg_gc_thread");
+               if (IS_ERR(gc_task)) {
+                       CERROR("%s: cannot start ChangeLog garbage collection "
+                              "thread: rc = %ld\n", obd->obd_name,
+                              PTR_ERR(gc_task));
+                       mdd->mdd_cl.mc_gc_task = NULL;
+               } else {
+                       CDEBUG(D_HA, "%s: ChangeLog garbage collection thread "
+                              "has started with Pid %d\n", obd->obd_name,
+                              gc_task->pid);
+                       mdd->mdd_cl.mc_gc_task = gc_task;
+                       mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
+               }
+       }
 out_put:
        llog_ctxt_put(ctxt);
        if (rc > 0)
@@ -845,8 +1011,8 @@ static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
                                         const struct lu_fid *spfid,
                                         const struct lu_name *sname)
 {
-       struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
-       size_t                           extsize = sname->ln_namelen + 1;
+       struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
+       size_t extsize = sname->ln_namelen + 1;
 
        LASSERT(sfid != NULL);
        LASSERT(spfid != NULL);
@@ -862,7 +1028,7 @@ static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
 
 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
 {
-       struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
+       struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
 
        if (jobid == NULL || jobid[0] == '\0')
                return;
@@ -870,6 +1036,22 @@ void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
        strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
 }
 
+void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
+{
+       struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
+
+       ef->cr_extra_flags = eflags;
+}
+
+void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
+                                   __u64 uid, __u64 gid)
+{
+       struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
+
+       uidgid->cr_uid = uid;
+       uidgid->cr_gid = gid;
+}
+
 /** Store a namespace change changelog record
  * If this fails, we must fail the whole transaction; we don't
  * want the change to commit without the log entry.
@@ -897,6 +1079,7 @@ int mdd_changelog_ns_store(const struct lu_env *env,
        struct llog_changelog_rec       *rec;
        struct lu_buf                   *buf;
        int                              reclen;
+       __u64                            xflags = CLFE_INVALID;
        int                              rc;
        ENTRY;
 
@@ -918,6 +1101,7 @@ int mdd_changelog_ns_store(const struct lu_env *env,
        rec = buf->lb_buf;
 
        crf &= CLF_FLAGMASK;
+       crf |= CLF_EXTRA_FLAGS;
 
        if (uc != NULL && uc->uc_jobid[0] != '\0')
                crf |= CLF_JOBID;
@@ -927,7 +1111,17 @@ int mdd_changelog_ns_store(const struct lu_env *env,
        else
                crf |= CLF_VERSION;
 
+       xflags |= CLFE_UIDGID;
+
        rec->cr.cr_flags = crf;
+
+       if (crf & CLF_EXTRA_FLAGS) {
+               mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
+               if (xflags & CLFE_UIDGID)
+                       mdd_changelog_rec_extra_uidgid(&rec->cr,
+                                                      uc->uc_uid, uc->uc_gid);
+       }
+
        rec->cr.cr_type = (__u32)type;
        rec->cr.cr_pfid = *tpfid;
        rec->cr.cr_namelen = tname->ln_namelen;
@@ -941,7 +1135,7 @@ int mdd_changelog_ns_store(const struct lu_env *env,
 
        if (likely(target != NULL)) {
                rec->cr.cr_tfid = *mdo2fid(target);
-               target->mod_cltime = cfs_time_current_64();
+               target->mod_cltime = ktime_get();
        } else {
                fid_zero(&rec->cr.cr_tfid);
        }
@@ -1938,6 +2132,24 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                check_perm = false;
        }
 
+       if (S_ISDIR(cattr->la_mode) &&
+           unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
+           spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
+               const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+
+               if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC) &&
+                   le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
+                       rc = -EINVAL;
+                       CERROR("%s: invalid lmv_user_md: magic = %x, "
+                              "stripe_offset = %d, stripe_count = %u: "
+                              "rc = %d\n", mdd2obd_dev(m)->obd_name,
+                               le32_to_cpu(lum->lum_magic),
+                              (int)le32_to_cpu(lum->lum_stripe_offset),
+                              le32_to_cpu(lum->lum_stripe_count), rc);
+                       return rc;
+               }
+       }
+
        rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
        if (rc != 0)
                RETURN(rc);
@@ -1989,7 +2201,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_declare_object_create(const struct lu_env *env,
+static int mdd_declare_create_object(const struct lu_env *env,
                                     struct mdd_device *mdd,
                                     struct mdd_object *p, struct mdd_object *c,
                                     struct lu_attr *attr,
@@ -2002,10 +2214,10 @@ static int mdd_declare_object_create(const struct lu_env *env,
        const struct lu_buf *buf;
        int rc;
 
-       rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
+       rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
                                                hint);
-        if (rc)
-                GOTO(out, rc);
+       if (rc)
+               GOTO(out, rc);
 
 #ifdef CONFIG_FS_POSIX_ACL
        if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
@@ -2081,7 +2293,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
 {
        int rc;
 
-       rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
+       rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
                                       def_acl_buf, acl_buf, hint);
        if (rc)
                GOTO(out, rc);
@@ -2167,18 +2379,18 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
 /**
  * Create a metadata object and initialize it, set acl, xattr.
  **/
-static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
+static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                             struct mdd_object *son, struct lu_attr *attr,
                             struct md_op_spec *spec, struct lu_buf *acl_buf,
                             struct lu_buf *def_acl_buf,
                             struct dt_allocation_hint *hint,
                             struct thandle *handle)
 {
-       const struct lu_buf    *buf;
-       int                     rc;
+       const struct lu_buf *buf;
+       int rc;
 
        mdd_write_lock(env, son, MOR_TGT_CHILD);
-       rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
+       rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
                                        hint);
        if (rc)
                GOTO(unlock, rc);
@@ -2245,7 +2457,6 @@ static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
                struct dt_object *dt = mdd_object_child(son);
                const char *target_name = spec->u.sp_symname;
                int sym_len = strlen(target_name);
-               const struct lu_buf *buf;
                loff_t pos = 0;
 
                buf = mdd_buf_get_const(env, target_name, sym_len);
@@ -2331,12 +2542,51 @@ stop:
        RETURN(rc);
 }
 
-/*
+/**
  * Create object and insert it into namespace.
+ *
+ * Two operations have to be performed:
+ *
+ *  - an allocation of a new object (->do_create()), and
+ *  - an insertion into a parent index (->dio_insert()).
+ *
+ * Due to locking, operation order is not important, when both are
+ * successful, *but* error handling cases are quite different:
+ *
+ *  - if insertion is done first, and following object creation fails,
+ *  insertion has to be rolled back, but this operation might fail
+ *  also leaving us with dangling index entry.
+ *
+ *  - if creation is done first, is has to be undone if insertion fails,
+ *  leaving us with leaked space, which is not good but not fatal.
+ *
+ * It seems that creation-first is simplest solution, but it is sub-optimal
+ * in the frequent
+ *
+ * $ mkdir foo
+ * $ mkdir foo
+ *
+ * case, because second mkdir is bound to create object, only to
+ * destroy it immediately.
+ *
+ * To avoid this follow local file systems that do double lookup:
+ *
+ * 0. lookup -> -EEXIST (mdd_create_sanity_check())
+ * 1. create            (mdd_create_object_internal())
+ * 2. insert            (__mdd_index_insert(), lookup again)
+ *
+ * \param[in] pobj     parent object
+ * \param[in] lname    name of child being created
+ * \param[in,out] child        child object being created
+ * \param[in] spec     additional create parameters
+ * \param[in] ma       attributes for new child object
+ *
+ * \retval             0 on success
+ * \retval             negative errno on failure
  */
 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                      const struct lu_name *lname, struct md_object *child,
-                     struct md_op_spec *spec, struct md_attrma)
+                     struct md_op_spec *spec, struct md_attr *ma)
 {
        struct mdd_thread_info  *info = mdd_env_info(env);
        struct lu_attr          *la = &info->mti_la_for_fix;
@@ -2355,42 +2605,6 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        int                      rc2;
        ENTRY;
 
-        /*
-         * Two operations have to be performed:
-         *
-         *  - an allocation of a new object (->do_create()), and
-         *
-         *  - an insertion into a parent index (->dio_insert()).
-         *
-         * Due to locking, operation order is not important, when both are
-         * successful, *but* error handling cases are quite different:
-         *
-         *  - if insertion is done first, and following object creation fails,
-         *  insertion has to be rolled back, but this operation might fail
-         *  also leaving us with dangling index entry.
-         *
-         *  - if creation is done first, is has to be undone if insertion
-         *  fails, leaving us with leaked space, which is neither good, nor
-         *  fatal.
-         *
-         * It seems that creation-first is simplest solution, but it is
-         * sub-optimal in the frequent
-         *
-         *         $ mkdir foo
-         *         $ mkdir foo
-         *
-         * case, because second mkdir is bound to create object, only to
-         * destroy it immediately.
-         *
-         * To avoid this follow local file systems that do double lookup:
-         *
-         *     0. lookup -> -EEXIST (mdd_create_sanity_check())
-         *
-         *     1. create            (mdd_object_create_internal())
-         *
-         *     2. insert            (__mdd_index_insert(), lookup again)
-         */
-
        rc = mdd_la_get(env, mdd_pobj, pattr);
        if (rc != 0)
                RETURN(rc);
@@ -2400,7 +2614,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        if (rc)
                RETURN(rc);
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
                GOTO(out_free, rc = -EINPROGRESS);
 
        handle = mdd_trans_create(env, mdd);
@@ -2434,14 +2648,14 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
                                handle, spec, ldata, &def_acl_buf, &acl_buf,
                                hint);
-        if (rc)
-                GOTO(out_stop, rc);
+       if (rc)
+               GOTO(out_stop, rc);
 
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(out_stop, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(out_stop, rc);
 
-       rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
+       rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
                               &def_acl_buf, hint, handle);
        if (rc != 0)
                GOTO(out_stop, rc);
@@ -3415,7 +3629,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
        int                     rc;
        int                     mgr_easize;
 
-       rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
+       rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
                                                handle, spec, NULL);
        if (rc != 0)
                return rc;
@@ -3565,7 +3779,7 @@ static int mdd_migrate_create(const struct lu_env *env,
        la->la_valid &= ~LA_NLINK;
 
        /* create the target object */
-       rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
+       rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
                               hint, handle);
        if (rc != 0)
                GOTO(stop_trans, rc);