Whamcloud - gitweb
LU-9330 osp: make variables match proc tunables
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index c85b837..c88eeb1 100644 (file)
@@ -52,7 +52,7 @@
  * = import is disconnected =
  *
  * = import is inactive =
- *   in this case osp_declare_object_create() returns an error
+ *   in this case osp_declare_create() returns an error
  *
  */
 
@@ -239,23 +239,6 @@ void osp_statfs_need_now(struct osp_device *d)
 }
 
 /**
- * Return number of precreated objects
- *
- * A simple helper to calculate the number of precreated objects on the device.
- *
- * \param[in] env      LU environment provided by the caller
- * \param[in] osp      OSP device
- *
- * \retval             the number of the precreated objects
- */
-static inline int osp_objs_precreated(const struct lu_env *env,
-                                     struct osp_device *osp)
-{
-       return osp_fid_diff(&osp->opd_pre_last_created_fid,
-                           &osp->opd_pre_used_fid);
-}
-
-/**
  * Check pool of precreated objects is nearly empty
  *
  * We should not wait till the pool of the precreated objects is exhausted,
@@ -494,16 +477,17 @@ static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
        if (fid_is_idif(fid)) {
                struct lu_fid   *last_fid;
                struct ost_id   *oi = &osi->osi_oi;
+               int rc;
 
                spin_lock(&osp->opd_pre_lock);
                last_fid = &osp->opd_pre_last_created_fid;
                fid_to_ostid(last_fid, oi);
                end = min(ostid_id(oi) + *grow, IDIF_MAX_OID);
                *grow = end - ostid_id(oi);
-               ostid_set_id(oi, ostid_id(oi) + *grow);
+               rc = ostid_set_id(oi, ostid_id(oi) + *grow);
                spin_unlock(&osp->opd_pre_lock);
 
-               if (*grow == 0)
+               if (*grow == 0 || rc)
                        return 1;
 
                ostid_to_fid(fid, oi, osp->opd_index);
@@ -576,7 +560,6 @@ static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
                RETURN(rc);
        }
 
-       LASSERT(d->opd_pre->osp_pre_delorphan_sent != 0);
        spin_lock(&d->opd_pre_lock);
        if (d->opd_pre_create_count > d->opd_pre_max_create_count / 2)
                d->opd_pre_create_count = d->opd_pre_max_create_count / 2;
@@ -775,6 +758,7 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
 {
        struct osp_thread_info  *osi = osp_env_info(env);
        struct lu_fid           *last_fid = &osi->osi_fid;
+       struct lu_fid            tmp;
        struct ptlrpc_request   *req = NULL;
        struct obd_import       *imp;
        struct ost_body         *body;
@@ -782,40 +766,34 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        int                      update_status = 0;
        int                      rc;
        int                      diff;
-       struct lu_fid            fid;
 
        ENTRY;
 
        /*
-        * wait for local recovery to finish, so we can cleanup orphans.
-        * orphans are all objects since "last used" (assigned).
-        * consider reserved objects as created otherwise we can get into
-        * a livelock when one blocked thread holding a reservation can
-        * block recovery. see LU-8367 for the details. in some cases this
-        * can result in gaps (i.e. leaked objects), but we've got LFSCK...
-        *
-        * do not allow new reservations because they may end up getting
-        * orphans being cleaned up below. so we block new reservations.
+        * wait for local recovery to finish, so we can cleanup orphans
+        * orphans are all objects since "last used" (assigned). we do not
+        * block waiting for all reservations as this can lead to a deadlock
+        * see LU-8972 for the details.
         */
        spin_lock(&d->opd_pre_lock);
        d->opd_pre_recovering = 1;
        spin_unlock(&d->opd_pre_lock);
-       /*
-        * The locking above makes sure the opd_pre_reserved check below will
-        * catch all osp_precreate_reserve() calls who find
-        * "!opd_pre_recovering".
-        */
+
        l_wait_event(d->opd_pre_waitq, d->opd_recovery_completed ||
                     !osp_precreate_running(d) || d->opd_got_disconnected,
                     &lwi);
        if (!osp_precreate_running(d) || d->opd_got_disconnected)
                GOTO(out, rc = -EAGAIN);
 
+       CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
+              d->opd_obd->obd_name, PFID(&d->opd_last_used_fid));
+
        *last_fid = d->opd_last_used_fid;
        /* The OSP should already get the valid seq now */
        LASSERT(!fid_is_zero(last_fid));
        if (fid_oid(&d->opd_last_used_fid) < 2) {
                /* lastfid looks strange... ask OST */
+               LCONSOLE_WARN("%s: refresh last id\n", d->opd_obd->obd_name);
                rc = osp_get_lastfid_from_ost(env, d);
                if (rc)
                        GOTO(out, rc);
@@ -839,24 +817,24 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        if (body == NULL)
                GOTO(out, rc = -EPROTO);
 
-       body->oa.o_flags = 0;
+       body->oa.o_flags = OBD_FL_DELORPHAN;
        body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
 
-       /* unless this is the very first DELORPHAN (when we really
-        * can destroy some orphans), just tell OST to recreate
-        * missing objects in our precreate pool */
+       /* cleanup objects upto used+reserved as we do not
+        * want to block the orphan cleanup procedure */
        spin_lock(&d->opd_pre_lock);
-       if (d->opd_pre->osp_pre_delorphan_sent) {
-               fid = d->opd_pre_last_created_fid;
+       if (fid_seq(&d->opd_pre_used_fid) != 0) {
+               tmp = d->opd_pre_used_fid;
+               tmp.f_oid += d->opd_pre_reserved;
+               /* shrink current precreate window to let reserved
+                * already objects be created and block new
+                * precreations */
+               d->opd_pre_last_created_fid = tmp;
        } else {
-               fid = d->opd_last_used_fid;
-               body->oa.o_flags = OBD_FL_DELORPHAN;
+               tmp = d->opd_last_used_fid;
        }
+       fid_to_ostid(&tmp, &body->oa.o_oi);
        spin_unlock(&d->opd_pre_lock);
-       fid_to_ostid(&fid, &body->oa.o_oi);
-
-       CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
-              d->opd_obd->obd_name, PFID(&fid));
 
        ptlrpc_request_set_replen(req);
 
@@ -873,32 +851,9 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        if (body == NULL)
                GOTO(out, rc = -EPROTO);
 
-       /*
-        * OST provides us with id new pool starts from in body->oa.o_id
-        */
+       /* OST provides us with id new pool starts from in body->oa.o_id */
        ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
 
-       spin_lock(&d->opd_pre_lock);
-       diff = osp_fid_diff(&fid, last_fid);
-       if (diff > 0) {
-               d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
-               d->opd_pre_last_created_fid = *last_fid;
-       } else {
-               d->opd_pre_create_count = OST_MIN_PRECREATE;
-               d->opd_pre_last_created_fid = *last_fid;
-       }
-       /*
-        * This empties the pre-creation pool and effectively blocks any new
-        * reservations.
-        */
-       LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
-               LUSTRE_DATA_SEQ_MAX_WIDTH);
-       if (d->opd_pre->osp_pre_delorphan_sent == 0)
-               d->opd_pre_used_fid = d->opd_pre_last_created_fid;
-       d->opd_pre_create_slow = 0;
-       spin_unlock(&d->opd_pre_lock);
-       d->opd_pre->osp_pre_delorphan_sent = 1;
-
        CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
               "last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
               PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
@@ -924,12 +879,41 @@ out:
                } else {
                        wake_up(&d->opd_pre_user_waitq);
                }
+               GOTO(ret, rc);
+       }
+
+       spin_lock(&d->opd_pre_lock);
+       d->opd_pre_recovering = 0;
+       spin_unlock(&d->opd_pre_lock);
+
+       /* now we wait until all reserved objects are consumed or released,
+        * so that the window doesn't change. otherwise we can get objects
+        * with wrong FIDs */
+       l_wait_event(d->opd_pre_waitq, d->opd_pre_reserved == 0 ||
+                    !osp_precreate_running(d) || d->opd_got_disconnected, &lwi);
+       if (!osp_precreate_running(d))
+               GOTO(ret, rc = 0);
+
+       spin_lock(&d->opd_pre_lock);
+       diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
+       if (diff > 0) {
+               d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
+               d->opd_pre_last_created_fid = d->opd_last_used_fid;
        } else {
-               spin_lock(&d->opd_pre_lock);
-               d->opd_pre_recovering = 0;
-               spin_unlock(&d->opd_pre_lock);
+               d->opd_pre_create_count = OST_MIN_PRECREATE;
+               d->opd_pre_last_created_fid = *last_fid;
        }
+       /*
+        * This empties the pre-creation pool and effectively blocks any new
+        * reservations.
+        */
+       LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
+                       LUSTRE_DATA_SEQ_MAX_WIDTH);
+       d->opd_pre_used_fid = d->opd_pre_last_created_fid;
+       d->opd_pre_create_slow = 0;
+       spin_unlock(&d->opd_pre_lock);
 
+ret:
        RETURN(rc);
 }
 
@@ -1007,8 +991,8 @@ void osp_pre_update_status(struct osp_device *d, int rc)
                                       d->opd_pre_status, rc);
                        CDEBUG(D_INFO,
                               "non-committed changes: %u, in progress: %u\n",
-                              atomic_read(&d->opd_syn_changes),
-                              atomic_read(&d->opd_syn_rpc_in_progress));
+                              atomic_read(&d->opd_sync_changes),
+                              atomic_read(&d->opd_sync_rpcs_in_progress));
                } else if (unlikely(old == -ENOSPC)) {
                        d->opd_pre_status = 0;
                        spin_lock(&d->opd_pre_lock);
@@ -1308,8 +1292,8 @@ static int osp_precreate_ready_condition(const struct lu_env *env,
                return 1;
 
        /* ready if OST reported no space and no destroys in progress */
-       if (atomic_read(&d->opd_syn_changes) +
-           atomic_read(&d->opd_syn_rpc_in_progress) == 0 &&
+       if (atomic_read(&d->opd_sync_changes) +
+           atomic_read(&d->opd_sync_rpcs_in_progress) == 0 &&
            d->opd_pre_status == -ENOSPC)
                return 1;
 
@@ -1334,12 +1318,12 @@ static int osp_precreate_timeout_condition(void *data)
        struct osp_device *d = data;
 
        CDEBUG(D_HA, "%s: slow creates, last="DFID", next="DFID", "
-             "reserved=%llu, syn_changes=%u, "
-             "syn_rpc_in_progress=%d, status=%d\n",
+             "reserved=%llu, sync_changes=%u, "
+             "sync_rpcs_in_progress=%d, status=%d\n",
              d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
              PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
-             atomic_read(&d->opd_syn_changes),
-             atomic_read(&d->opd_syn_rpc_in_progress),
+             atomic_read(&d->opd_sync_changes),
+             atomic_read(&d->opd_sync_rpcs_in_progress),
              d->opd_pre_status);
 
        return 1;
@@ -1438,16 +1422,16 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                 * wait till that is done - some space might be released
                 */
                if (unlikely(rc == -ENOSPC)) {
-                       if (atomic_read(&d->opd_syn_changes)) {
+                       if (atomic_read(&d->opd_sync_changes)) {
                                /* force local commit to release space */
                                dt_commit_async(env, d->opd_storage);
                        }
-                       if (atomic_read(&d->opd_syn_rpc_in_progress)) {
+                       if (atomic_read(&d->opd_sync_rpcs_in_progress)) {
                                /* just wait till destroys are done */
                                /* see l_wait_even() few lines below */
                        }
-                       if (atomic_read(&d->opd_syn_changes) +
-                           atomic_read(&d->opd_syn_rpc_in_progress) == 0) {
+                       if (atomic_read(&d->opd_sync_changes) +
+                           atomic_read(&d->opd_sync_rpcs_in_progress) == 0) {
                                /* no hope for free space */
                                break;
                        }