}
/**
+ * Return number of precreated objects
+ *
+ * A simple helper to calculate the number of precreated objects on the device.
+ *
+ * \param[in] env LU environment provided by the caller
+ * \param[in] osp OSP device
+ *
+ * \retval the number of the precreated objects
+ */
+static inline int osp_objs_precreated(const struct lu_env *env,
+ struct osp_device *osp)
+{
+ return osp_fid_diff(&osp->opd_pre_last_created_fid,
+ &osp->opd_pre_used_fid);
+}
+
+/**
* Check pool of precreated objects is nearly empty
*
* We should not wait till the pool of the precreated objects is exhausted,
{
struct osp_thread_info *osi = osp_env_info(env);
struct lu_fid *last_fid = &osi->osi_fid;
- struct lu_fid tmp;
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
struct ost_body *body;
/*
* wait for local recovery to finish, so we can cleanup orphans
- * orphans are all objects since "last used" (assigned). we do not
- * block waiting for all reservations as this can lead to a deadlock
- * see LU-8972 for the details.
+ * orphans are all objects since "last used" (assigned), but
+ * there might be objects reserved and in some cases they won't
+ * be used. we can't cleanup them till we're sure they won't be
+ * used. also can't we allow new reservations because they may
+ * end up getting orphans being cleaned up below. so we block
+ * new reservations and wait till all reserved objects either
+ * user or released.
*/
spin_lock(&d->opd_pre_lock);
d->opd_pre_recovering = 1;
spin_unlock(&d->opd_pre_lock);
-
- l_wait_event(d->opd_pre_waitq, d->opd_recovery_completed ||
+ /*
+ * The locking above makes sure the opd_pre_reserved check below will
+ * catch all osp_precreate_reserve() calls who find
+ * "!opd_pre_recovering".
+ */
+ l_wait_event(d->opd_pre_waitq,
+ (!d->opd_pre_reserved && d->opd_recovery_completed) ||
!osp_precreate_running(d) || d->opd_got_disconnected,
&lwi);
if (!osp_precreate_running(d) || d->opd_got_disconnected)
LASSERT(!fid_is_zero(last_fid));
if (fid_oid(&d->opd_last_used_fid) < 2) {
/* lastfid looks strange... ask OST */
- LCONSOLE_WARN("%s: refresh last id\n", d->opd_obd->obd_name);
rc = osp_get_lastfid_from_ost(env, d);
if (rc)
GOTO(out, rc);
body->oa.o_flags = OBD_FL_DELORPHAN;
body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
- /* cleanup objects upto used+reserved as we do not
- * want to block the orphan cleanup procedure */
- spin_lock(&d->opd_pre_lock);
- if (fid_seq(&d->opd_pre_used_fid) != 0) {
- tmp = d->opd_pre_used_fid;
- tmp.f_oid += d->opd_pre_reserved;
- /* shrink current precreate window to let reserved
- * already objects be created and block new
- * precreations */
- d->opd_pre_last_created_fid = tmp;
- } else {
- tmp = d->opd_last_used_fid;
- }
- fid_to_ostid(&tmp, &body->oa.o_oi);
- spin_unlock(&d->opd_pre_lock);
+ fid_to_ostid(&d->opd_last_used_fid, &body->oa.o_oi);
ptlrpc_request_set_replen(req);
if (body == NULL)
GOTO(out, rc = -EPROTO);
- /* OST provides us with id new pool starts from in body->oa.o_id */
+ /*
+ * OST provides us with id new pool starts from in body->oa.o_id
+ */
ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
+ spin_lock(&d->opd_pre_lock);
+ diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
+ if (diff > 0) {
+ d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
+ d->opd_pre_last_created_fid = d->opd_last_used_fid;
+ } else {
+ d->opd_pre_create_count = OST_MIN_PRECREATE;
+ d->opd_pre_last_created_fid = *last_fid;
+ }
+ /*
+ * This empties the pre-creation pool and effectively blocks any new
+ * reservations.
+ */
+ LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
+ LUSTRE_DATA_SEQ_MAX_WIDTH);
+ d->opd_pre_used_fid = d->opd_pre_last_created_fid;
+ d->opd_pre_create_slow = 0;
+ spin_unlock(&d->opd_pre_lock);
+
CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
"last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
} else {
wake_up(&d->opd_pre_user_waitq);
}
- GOTO(ret, rc);
- }
-
- spin_lock(&d->opd_pre_lock);
- d->opd_pre_recovering = 0;
- spin_unlock(&d->opd_pre_lock);
-
- /* now we wait until all reserved objects are consumed or released,
- * so that the window doesn't change. otherwise we can get objects
- * with wrong FIDs */
- l_wait_event(d->opd_pre_waitq, d->opd_pre_reserved == 0 ||
- !osp_precreate_running(d) || d->opd_got_disconnected, &lwi);
- if (!osp_precreate_running(d))
- GOTO(ret, rc = 0);
-
- spin_lock(&d->opd_pre_lock);
- diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
- if (diff > 0) {
- d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
- d->opd_pre_last_created_fid = d->opd_last_used_fid;
} else {
- d->opd_pre_create_count = OST_MIN_PRECREATE;
- d->opd_pre_last_created_fid = *last_fid;
+ spin_lock(&d->opd_pre_lock);
+ d->opd_pre_recovering = 0;
+ spin_unlock(&d->opd_pre_lock);
}
- /*
- * This empties the pre-creation pool and effectively blocks any new
- * reservations.
- */
- LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
- LUSTRE_DATA_SEQ_MAX_WIDTH);
- d->opd_pre_used_fid = d->opd_pre_last_created_fid;
- d->opd_pre_create_slow = 0;
- spin_unlock(&d->opd_pre_lock);
-ret:
RETURN(rc);
}
if (d->opd_pre_max_create_count == 0)
RETURN(-ENOBUFS);
- if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_OSP_PRECREATE_WAIT)) {
- if (d->opd_index == cfs_fail_val)
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OSP_PRECREATE_WAIT,
- obd_timeout);
- }
-
/*
* wait till:
* - preallocation is done