Whamcloud - gitweb
LU-8367 osp: orphan cleanup do not wait for reserved
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index 4d867fa..1a8bfa5 100644 (file)
@@ -239,23 +239,6 @@ void osp_statfs_need_now(struct osp_device *d)
 }
 
 /**
- * Return number of precreated objects
- *
- * A simple helper to calculate the number of precreated objects on the device.
- *
- * \param[in] env      LU environment provided by the caller
- * \param[in] osp      OSP device
- *
- * \retval             the number of the precreated objects
- */
-static inline int osp_objs_precreated(const struct lu_env *env,
-                                     struct osp_device *osp)
-{
-       return osp_fid_diff(&osp->opd_pre_last_created_fid,
-                           &osp->opd_pre_used_fid);
-}
-
-/**
  * Check pool of precreated objects is nearly empty
  *
  * We should not wait till the pool of the precreated objects is exhausted,
@@ -775,6 +758,7 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
 {
        struct osp_thread_info  *osi = osp_env_info(env);
        struct lu_fid           *last_fid = &osi->osi_fid;
+       struct lu_fid            tmp;
        struct ptlrpc_request   *req = NULL;
        struct obd_import       *imp;
        struct ost_body         *body;
@@ -787,24 +771,15 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
 
        /*
         * wait for local recovery to finish, so we can cleanup orphans
-        * orphans are all objects since "last used" (assigned), but
-        * there might be objects reserved and in some cases they won't
-        * be used. we can't cleanup them till we're sure they won't be
-        * used. also can't we allow new reservations because they may
-        * end up getting orphans being cleaned up below. so we block
-        * new reservations and wait till all reserved objects either
-        * user or released.
+        * orphans are all objects since "last used" (assigned). we do not
+        * block waiting for all reservations as this can lead to a deadlock
+        * see LU-8972 for the details.
         */
        spin_lock(&d->opd_pre_lock);
        d->opd_pre_recovering = 1;
        spin_unlock(&d->opd_pre_lock);
-       /*
-        * The locking above makes sure the opd_pre_reserved check below will
-        * catch all osp_precreate_reserve() calls who find
-        * "!opd_pre_recovering".
-        */
-       l_wait_event(d->opd_pre_waitq,
-                    (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+
+       l_wait_event(d->opd_pre_waitq, d->opd_recovery_completed ||
                     !osp_precreate_running(d) || d->opd_got_disconnected,
                     &lwi);
        if (!osp_precreate_running(d) || d->opd_got_disconnected)
@@ -818,6 +793,7 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        LASSERT(!fid_is_zero(last_fid));
        if (fid_oid(&d->opd_last_used_fid) < 2) {
                /* lastfid looks strange... ask OST */
+               LCONSOLE_WARN("%s: refresh last id\n", d->opd_obd->obd_name);
                rc = osp_get_lastfid_from_ost(env, d);
                if (rc)
                        GOTO(out, rc);
@@ -844,7 +820,21 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        body->oa.o_flags = OBD_FL_DELORPHAN;
        body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
 
-       fid_to_ostid(&d->opd_last_used_fid, &body->oa.o_oi);
+       /* cleanup objects upto used+reserved as we do not
+        * want to block the orphan cleanup procedure */
+       spin_lock(&d->opd_pre_lock);
+       if (fid_seq(&d->opd_pre_used_fid) != 0) {
+               tmp = d->opd_pre_used_fid;
+               tmp.f_oid += d->opd_pre_reserved;
+               /* shrink current precreate window to let reserved
+                * already objects be created and block new
+                * precreations */
+               d->opd_pre_last_created_fid = tmp;
+       } else {
+               tmp = d->opd_last_used_fid;
+       }
+       fid_to_ostid(&tmp, &body->oa.o_oi);
+       spin_unlock(&d->opd_pre_lock);
 
        ptlrpc_request_set_replen(req);
 
@@ -861,30 +851,9 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        if (body == NULL)
                GOTO(out, rc = -EPROTO);
 
-       /*
-        * OST provides us with id new pool starts from in body->oa.o_id
-        */
+       /* OST provides us with id new pool starts from in body->oa.o_id */
        ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
 
-       spin_lock(&d->opd_pre_lock);
-       diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
-       if (diff > 0) {
-               d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
-               d->opd_pre_last_created_fid = d->opd_last_used_fid;
-       } else {
-               d->opd_pre_create_count = OST_MIN_PRECREATE;
-               d->opd_pre_last_created_fid = *last_fid;
-       }
-       /*
-        * This empties the pre-creation pool and effectively blocks any new
-        * reservations.
-        */
-       LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
-               LUSTRE_DATA_SEQ_MAX_WIDTH);
-       d->opd_pre_used_fid = d->opd_pre_last_created_fid;
-       d->opd_pre_create_slow = 0;
-       spin_unlock(&d->opd_pre_lock);
-
        CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
               "last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
               PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
@@ -910,12 +879,41 @@ out:
                } else {
                        wake_up(&d->opd_pre_user_waitq);
                }
+               GOTO(ret, rc);
+       }
+
+       spin_lock(&d->opd_pre_lock);
+       d->opd_pre_recovering = 0;
+       spin_unlock(&d->opd_pre_lock);
+
+       /* now we wait until all reserved objects are consumed or released,
+        * so that the window doesn't change. otherwise we can get objects
+        * with wrong FIDs */
+       l_wait_event(d->opd_pre_waitq, d->opd_pre_reserved == 0 ||
+                    !osp_precreate_running(d) || d->opd_got_disconnected, &lwi);
+       if (!osp_precreate_running(d))
+               GOTO(ret, rc = 0);
+
+       spin_lock(&d->opd_pre_lock);
+       diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
+       if (diff > 0) {
+               d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
+               d->opd_pre_last_created_fid = d->opd_last_used_fid;
        } else {
-               spin_lock(&d->opd_pre_lock);
-               d->opd_pre_recovering = 0;
-               spin_unlock(&d->opd_pre_lock);
+               d->opd_pre_create_count = OST_MIN_PRECREATE;
+               d->opd_pre_last_created_fid = *last_fid;
        }
+       /*
+        * This empties the pre-creation pool and effectively blocks any new
+        * reservations.
+        */
+       LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
+                       LUSTRE_DATA_SEQ_MAX_WIDTH);
+       d->opd_pre_used_fid = d->opd_pre_last_created_fid;
+       d->opd_pre_create_slow = 0;
+       spin_unlock(&d->opd_pre_lock);
 
+ret:
        RETURN(rc);
 }
 
@@ -1371,6 +1369,12 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
        if (d->opd_pre_max_create_count == 0)
                RETURN(-ENOBUFS);
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_OSP_PRECREATE_WAIT)) {
+               if (d->opd_index == cfs_fail_val)
+                       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OSP_PRECREATE_WAIT,
+                                        obd_timeout);
+       }
+
        /*
         * wait till:
         *  - preallocation is done