Whamcloud - gitweb
LU-11912 ofd: reduce LUSTRE_DATA_SEQ_MAX_WIDTH
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index 0b32464..384ad03 100644 (file)
@@ -370,30 +370,6 @@ static inline int osp_precreate_near_empty(const struct lu_env *env,
 }
 
 /**
- * Check given sequence is empty
- *
- * Returns a binary result whether the given sequence has some IDs left
- * or not. Find the details in osp_fid_end_seq(). This is a lock protected
- * version of that function.
- *
- * \param[in] env      LU environment provided by the caller
- * \param[in] osp      OSP device
- *
- * \retval             0 - current sequence has no IDs, 1 - otherwise
- */
-static inline int osp_create_end_seq(const struct lu_env *env,
-                                    struct osp_device *osp)
-{
-       struct lu_fid *fid = &osp->opd_pre_used_fid;
-       int rc;
-
-       spin_lock(&osp->opd_pre_lock);
-       rc = osp_fid_end_seq(env, fid);
-       spin_unlock(&osp->opd_pre_lock);
-       return rc;
-}
-
-/**
  * Write FID into into last_oid/last_seq file
  *
  * The function stores the sequence and the in-sequence id into two dedicated
@@ -476,13 +452,13 @@ out:
  *
  * When a current sequence has no available IDs left, OSP has to switch to
  * another new sequence. OSP requests it using the regular FLDB protocol
- * and stores synchronously before that is used in precreated. This is needed
+ * and stores synchronously before that is used in precreate. This is needed
  * to basically have the sequences referenced (not orphaned), otherwise it's
  * possible that OST has some objects precreated and the clients have data
  * written to it, but after MDT failover nobody refers those objects and OSP
  * has no idea that the sequence need cleanup to be done.
- * While this is very expensive operation, it's supposed to happen very very
- * infrequently because sequence has 2^32 or 2^48 objects (depending on type)
+ * While this is very expensive operation, it's supposed to happen infrequently
+ * because sequence has LUSTRE_DATA_SEQ_MAX_WIDTH=32M objects by default.
  *
  * \param[in] env      LU environment provided by the caller
  * \param[in] osp      OSP device
@@ -518,9 +494,9 @@ static int osp_precreate_rollover_new_seq(struct lu_env *env,
                RETURN(rc);
        }
 
-       LCONSOLE_INFO("%s: update sequence from %#llx to %#llx\n",
-                     osp->opd_obd->obd_name, fid_seq(last_fid),
-                     fid_seq(fid));
+       LCONSOLE(D_INFO, "%s: update sequence from %#llx to %#llx\n",
+                osp->opd_obd->obd_name, fid_seq(last_fid),
+                fid_seq(fid));
        /* Update last_xxx to the new seq */
        spin_lock(&osp->opd_pre_lock);
        osp->opd_last_used_fid = *fid;
@@ -554,9 +530,10 @@ static int osp_precreate_rollover_new_seq(struct lu_env *env,
 static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
                              struct lu_fid *fid, int *grow)
 {
-       struct osp_thread_info  *osi = osp_env_info(env);
-       __u64                   end;
-       int                     i = 0;
+       struct osp_thread_info *osi = osp_env_info(env);
+       __u64 seq_width = osp->opd_pre_seq_width;
+       __u64 end;
+       int i = 0;
 
        if (fid_is_idif(fid)) {
                struct lu_fid   *last_fid;
@@ -566,7 +543,7 @@ static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
                spin_lock(&osp->opd_pre_lock);
                last_fid = &osp->opd_pre_last_created_fid;
                fid_to_ostid(last_fid, oi);
-               end = min(ostid_id(oi) + *grow, IDIF_MAX_OID);
+               end = min(ostid_id(oi) + *grow, min(IDIF_MAX_OID, seq_width));
                *grow = end - ostid_id(oi);
                rc = ostid_set_id(oi, ostid_id(oi) + *grow);
                spin_unlock(&osp->opd_pre_lock);
@@ -581,7 +558,7 @@ static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
        spin_lock(&osp->opd_pre_lock);
        *fid = osp->opd_pre_last_created_fid;
        end = fid->f_oid;
-       end = min((end + *grow), (__u64)LUSTRE_DATA_SEQ_MAX_WIDTH);
+       end = min((end + *grow), min(OBIF_MAX_OID, seq_width));
        *grow = end - fid->f_oid;
        fid->f_oid += end - fid->f_oid;
        spin_unlock(&osp->opd_pre_lock);
@@ -717,6 +694,9 @@ ready:
                d->opd_pre_create_slow = 0;
        }
 
+       if ((body->oa.o_valid & OBD_MD_FLSIZE) && body->oa.o_size)
+               d->opd_pre_seq_width = body->oa.o_size;
+
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        fid_to_ostid(fid, &body->oa.o_oi);
 
@@ -971,10 +951,14 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
         * This empties the pre-creation pool and effectively blocks any new
         * reservations.
         */
-       LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
-               LUSTRE_DATA_SEQ_MAX_WIDTH);
+       LASSERTF(fid_oid(&d->opd_pre_last_created_fid) <= IDIF_MAX_OID,
+                "%s: last_created_fid "DFID" > %llu\n",
+                d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
+                IDIF_MAX_OID);
        d->opd_pre_used_fid = d->opd_pre_last_created_fid;
        d->opd_pre_create_slow = 0;
+       if ((body->oa.o_valid & OBD_MD_FLSIZE) && body->oa.o_size)
+               d->opd_pre_seq_width = body->oa.o_size;
        spin_unlock(&d->opd_pre_lock);
 
        CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
@@ -1331,7 +1315,9 @@ static int osp_precreate_thread(void *_args)
                while (!kthread_should_stop()) {
                        wait_event_idle(d->opd_pre_waitq,
                                        kthread_should_stop() ||
-                                       osp_precreate_near_empty(env, d) ||
+                                       (osp_precreate_near_empty(env, d) &&
+                                        !(osp_precreate_end_seq(env, d) &&
+                                          osp_objs_precreated(env, d) != 0)) ||
                                        osp_statfs_need_update(d) ||
                                        d->opd_got_disconnected);
 
@@ -1358,15 +1344,14 @@ static int osp_precreate_thread(void *_args)
                        /* To avoid handling different seq in precreate/orphan
                         * cleanup, it will hold precreate until current seq is
                         * used up. */
-                       if (unlikely(osp_precreate_end_seq(env, d) &&
-                           !osp_create_end_seq(env, d)))
-                               continue;
-
-                       if (unlikely(osp_precreate_end_seq(env, d) &&
-                                    osp_create_end_seq(env, d))) {
-                               rc = osp_precreate_rollover_new_seq(env, d);
-                               if (rc)
+                       if (unlikely(osp_precreate_end_seq(env, d))) {
+                               if (osp_objs_precreated(env, d) == 0) {
+                                       rc = osp_precreate_rollover_new_seq(env, d);
+                                       if (rc)
+                                               continue;
+                               } else {
                                        continue;
+                               }
                        }
 
                        if (osp_precreate_near_empty(env, d)) {
@@ -1501,19 +1486,31 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d,
 
                spin_lock(&d->opd_pre_lock);
                precreated = osp_objs_precreated(env, d);
-               if (precreated > d->opd_pre_reserved &&
-                   !d->opd_pre_recovering &&
-                   !d->opd_force_creation) {
-                       d->opd_pre_reserved++;
-                       spin_unlock(&d->opd_pre_lock);
-                       rc = 0;
-
-                       /* XXX: don't wake up if precreation is in progress */
-                       if (osp_precreate_near_empty_nolock(env, d) &&
-                          !osp_precreate_end_seq_nolock(env, d))
-                               wake_up(&d->opd_pre_waitq);
+               if (!d->opd_pre_recovering && !d->opd_force_creation) {
+                       if (precreated > d->opd_pre_reserved) {
+                               d->opd_pre_reserved++;
+                               spin_unlock(&d->opd_pre_lock);
+                               rc = 0;
+
+                               /*
+                                * XXX: don't wake up if precreation
+                                * is in progress
+                                */
+                               if (osp_precreate_near_empty_nolock(env, d) &&
+                                  !osp_precreate_end_seq_nolock(env, d))
+                                       wake_up(&d->opd_pre_waitq);
 
-                       break;
+                               break;
+                       } else if (unlikely(precreated &&
+                                  osp_precreate_end_seq_nolock(env, d))) {
+                               /*
+                                * precreate pool is reaching the end of the
+                                * current seq, and doesn't have enough objects
+                                */
+                               rc = -ENOSPC;
+                               spin_unlock(&d->opd_pre_lock);
+                               break;
+                       }
                }
                spin_unlock(&d->opd_pre_lock);
 
@@ -1607,31 +1604,32 @@ int osp_precreate_get_fid(const struct lu_env *env, struct osp_device *d,
                          struct lu_fid *fid)
 {
        struct lu_fid *pre_used_fid = &d->opd_pre_used_fid;
+
        /* grab next id from the pool */
        spin_lock(&d->opd_pre_lock);
 
        LASSERTF(osp_fid_diff(&d->opd_pre_used_fid,
                             &d->opd_pre_last_created_fid) < 0,
-                "next fid "DFID" last created fid "DFID"\n",
+                "next fid "DFID" last created fid "DFID"\n",
                 PFID(&d->opd_pre_used_fid),
                 PFID(&d->opd_pre_last_created_fid));
 
-       /*
-        * When sequence is used up, new one should be allocated in
-        * osp_precreate_rollover_new_seq. So ASSERT here to avoid
-        * objid overflow.
+       /* Non-IDIF FIDs shouldn't get here with OID == OBIF_MAX_OID. For IDIF,
+        * f_oid wraps and "f_seq" (holding high 16 bits of ID) needs increment
         */
-       LASSERTF(osp_fid_end_seq(env, pre_used_fid) == 0,
-                "next fid "DFID" last created fid "DFID"\n",
-                PFID(&d->opd_pre_used_fid),
-                PFID(&d->opd_pre_last_created_fid));
-       /* Non IDIF fids shoulnd't get here with oid == 0xFFFFFFFF. */
        if (fid_is_idif(pre_used_fid) &&
-           unlikely(fid_oid(pre_used_fid) == LUSTRE_DATA_SEQ_MAX_WIDTH))
-               pre_used_fid->f_seq++;
+           unlikely(fid_oid(pre_used_fid) == OBIF_MAX_OID)) {
+               struct ost_id oi;
+               __u32 idx = fid_idif_ost_idx(pre_used_fid);
+
+               fid_to_ostid(pre_used_fid, &oi);
+               oi.oi.oi_id++;
+               ostid_to_fid(pre_used_fid, &oi, idx);
+       } else {
+               pre_used_fid->f_oid++;
+       }
 
-       d->opd_pre_used_fid.f_oid++;
-       memcpy(fid, &d->opd_pre_used_fid, sizeof(*fid));
+       memcpy(fid, pre_used_fid, sizeof(*fid));
        d->opd_pre_reserved--;
        /*
         * last_used_id must be changed along with getting new id otherwise
@@ -1782,6 +1780,7 @@ int osp_init_precreate(struct osp_device *d)
        d->opd_pre_last_created_fid.f_oid = 1;
        d->opd_last_id = 0;
        d->opd_pre_reserved = 0;
+       d->opd_pre_seq_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
        d->opd_got_disconnected = 1;
        d->opd_pre_create_slow = 0;
        d->opd_pre_create_count = OST_MIN_PRECREATE;