Whamcloud - gitweb
LU-1445 osp: rollover to the new seq synchronously
authorwangdi <di.wang@whamcloud.com>
Thu, 26 Sep 2013 09:57:55 +0000 (02:57 -0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 28 Jan 2013 05:47:47 +0000 (00:47 -0500)
For simplicity, OSP will track only one sequence for precreation,
so it will need

1. Stop precreation when the current sequence is about to used up,
until all of the precreation objects has been used up.

2. ofd will precreate objects synchronously when the current sequence
is about to used up.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I4c0de8f2869c9a8e7a4b0d2abe06041e84885b2e
Reviewed-on: http://review.whamcloud.com/4790
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_obd.c
lustre/ofd/ofd_objects.c
lustre/osp/osp_internal.h
lustre/osp/osp_object.c
lustre/osp/osp_precreate.c

index d346990..63d324e 100644 (file)
@@ -405,7 +405,7 @@ struct ofd_object *ofd_object_find_or_create(const struct lu_env *env,
                                             struct lu_attr *attr);
 int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo);
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
-                         obd_id id, struct ofd_seq *oseq, int nr);
+                         obd_id id, struct ofd_seq *oseq, int nr, int sync);
 
 void ofd_object_put(const struct lu_env *env, struct ofd_object *fo);
 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
index ca44509..d32db3a 100644 (file)
@@ -1158,7 +1158,8 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
        struct ofd_thread_info  *info;
        obd_seq                 seq = oa->o_seq;
        struct ofd_seq          *oseq;
-       int                      rc = 0, diff;
+       int                     rc = 0, diff;
+       int                     sync_trans = 0;
 
        ENTRY;
 
@@ -1233,6 +1234,20 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                        diff = 1; /* shouldn't we create this right now? */
                } else {
                        diff = oa->o_id - ofd_seq_last_oid(oseq);
+                       /* Do sync create if the seq is about to used up */
+                       if (fid_seq_is_idif(oa->o_seq) ||
+                           fid_seq_is_mdt0(oa->o_seq)) {
+                               if (unlikely(oa->o_id >= IDIF_MAX_OID - 1))
+                                       sync_trans = 1;
+                       } else if (fid_seq_is_norm(oa->o_seq)) {
+                               if (unlikely(oa->o_id >=
+                                            LUSTRE_DATA_SEQ_MAX_WIDTH - 1))
+                                       sync_trans = 1;
+                       } else {
+                               CERROR("%s : invalid o_seq "LPX64": rc = %d\n",
+                                      ofd_name(ofd), oa->o_seq, -EINVAL);
+                               GOTO(out, rc = -EINVAL);
+                       }
                }
        }
        if (diff > 0) {
@@ -1273,7 +1288,7 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                        }
 
                        rc = ofd_precreate_objects(env, ofd, next_id,
-                                                  oseq, count);
+                                                  oseq, count, sync_trans);
                        if (rc > 0) {
                                created += rc;
                                diff -= rc;
index 1dd1abb..9ee20c4 100644 (file)
@@ -146,7 +146,7 @@ void ofd_object_put(const struct lu_env *env, struct ofd_object *fo)
 }
 
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
-                         obd_id id, struct ofd_seq *oseq, int nr)
+                         obd_id id, struct ofd_seq *oseq, int nr, int sync)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_object       *fo = NULL;
@@ -228,6 +228,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
+       th->th_sync |= sync;
+
        rc = dt_declare_record_write(env, oseq->os_lastid_obj, sizeof(tmp),
                                     info->fti_off, th);
        if (rc)
index 2ba3de2..6b95a78 100644 (file)
@@ -335,6 +335,38 @@ static inline void osp_update_last_fid(struct osp_device *d, struct lu_fid *fid)
        }
 }
 
+static int osp_fid_end_seq(const struct lu_env *env, struct lu_fid *fid)
+{
+       if (fid_is_idif(fid)) {
+               struct osp_thread_info *info = osp_env_info(env);
+               struct ost_id *oi = &info->osi_oi;
+
+               fid_ostid_pack(fid, oi);
+               return oi->oi_id == IDIF_MAX_OID;
+       } else {
+               return fid_oid(fid) == LUSTRE_DATA_SEQ_MAX_WIDTH;
+       }
+}
+
+static inline int osp_precreate_end_seq_nolock(const struct lu_env *env,
+                                              struct osp_device *osp)
+{
+       struct lu_fid *fid = &osp->opd_pre_last_created_fid;
+
+       return osp_fid_end_seq(env, fid);
+}
+
+static inline int osp_precreate_end_seq(const struct lu_env *env,
+                                       struct osp_device *osp)
+{
+       int rc;
+
+       spin_lock(&osp->opd_pre_lock);
+       rc = osp_precreate_end_seq_nolock(env, osp);
+       spin_unlock(&osp->opd_pre_lock);
+       return rc;
+}
+
 static inline int osp_is_fid_client(struct osp_device *osp)
 {
        struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
index 81fa936..337b710 100644 (file)
@@ -253,6 +253,13 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
 
        CDEBUG(D_INODE, "fid for osp_obj %p is "DFID"!\n", osp_obj, PFID(fid));
 
+       /* If the precreate ends, it means it will be ready to rollover to
+        * the new sequence soon, all the creation should be synchronized,
+        * otherwise during replay, the replay fid will be inconsistent with
+        * last_used/create fid */
+       if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d))
+               th->th_sync = 1;
+
        /*
         * it's OK if the import is inactive by this moment - id was created
         * by OST earlier, we just need to maintain it consistently on the disk
index d35b598..0a0f9f0 100644 (file)
@@ -245,6 +245,18 @@ static inline int osp_precreate_near_empty(const struct lu_env *env,
        return rc;
 }
 
+static inline int osp_create_end_seq(const struct lu_env *env,
+                                    struct osp_device *osp)
+{
+       struct lu_fid *fid = &osp->opd_pre_used_fid;
+       int rc;
+
+       spin_lock(&osp->opd_pre_lock);
+       rc = osp_fid_end_seq(env, fid);
+       spin_unlock(&osp->opd_pre_lock);
+       return rc;
+}
+
 /**
  * Write fid into last_oid/last_seq file.
  **/
@@ -309,6 +321,47 @@ out:
        RETURN(rc);
 }
 
+int osp_precreate_rollover_new_seq(struct lu_env *env, struct osp_device *osp)
+{
+       struct lu_fid   *fid = &osp_env_info(env)->osi_fid;
+       struct lu_fid   *last_fid = &osp->opd_last_used_fid;
+       int             rc;
+       ENTRY;
+
+       rc = seq_client_get_seq(env, osp->opd_obd->u.cli.cl_seq, &fid->f_seq);
+       if (rc != 0) {
+               CERROR("%s: alloc fid error: rc = %d\n",
+                      osp->opd_obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       fid->f_oid = 1;
+       fid->f_ver = 0;
+       LASSERTF(fid_seq(fid) != fid_seq(last_fid),
+                "fid "DFID", last_fid "DFID"\n", PFID(fid),
+                PFID(last_fid));
+
+       rc = osp_write_last_oid_seq_files(env, osp, fid, 1);
+       if (rc != 0) {
+               CERROR("%s: Can not update oid/seq file: rc = %d\n",
+                      osp->opd_obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       LCONSOLE_INFO("%s: update sequence from "LPX64" to "LPX64"\n",
+                     osp->opd_obd->obd_name, fid_seq(last_fid),
+                     fid_seq(fid));
+       /* Update last_xxx to the new seq */
+       spin_lock(&osp->opd_pre_lock);
+       osp->opd_last_used_fid = *fid;
+       osp->opd_gap_start_fid = *fid;
+       osp->opd_pre_used_fid = *fid;
+       osp->opd_pre_last_created_fid = *fid;
+       spin_unlock(&osp->opd_pre_lock);
+
+       RETURN(rc);
+}
+
 /**
  * alloc fids for precreation.
  * rc = 0 Success, @grow is the count of real allocation.
@@ -908,6 +961,24 @@ static int osp_precreate_thread(void *_arg)
                        if (osp_statfs_need_update(d))
                                osp_statfs_update(d);
 
+                       /* To avoid handling different seq in precreate/orphan
+                        * cleanup, it will hold precreate until current seq is
+                        * used up. */
+                       if (unlikely(osp_precreate_end_seq(&env, d) &&
+                           !osp_create_end_seq(&env, d)))
+                               continue;
+
+                       if (unlikely(osp_precreate_end_seq(&env, d) &&
+                                    osp_create_end_seq(&env, d))) {
+                               LCONSOLE_INFO("%s:"LPX64" is used up."
+                                             " Update to new seq\n",
+                                             d->opd_obd->obd_name,
+                                        fid_seq(&d->opd_pre_last_created_fid));
+                               rc = osp_precreate_rollover_new_seq(&env, d);
+                               if (rc)
+                                       continue;
+                       }
+
                        if (osp_precreate_near_empty(&env, d)) {
                                rc = osp_precreate_send(&env, d);
                                /* osp_precreate_send() sets opd_pre_status
@@ -1026,7 +1097,8 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                        rc = 0;
 
                        /* XXX: don't wake up if precreation is in progress */
-                       if (osp_precreate_near_empty_nolock(env, d))
+                       if (osp_precreate_near_empty_nolock(env, d) &&
+                          !osp_precreate_end_seq_nolock(env, d))
                                cfs_waitq_signal(&d->opd_pre_waitq);
 
                        break;