* = import is disconnected =
*
* = import is inactive =
- * in this case osp_declare_object_create() returns an error
+ * in this case osp_declare_create() returns an error
*
*/
}
/**
- * Return number of precreated objects
- *
- * A simple helper to calculate the number of precreated objects on the device.
- *
- * \param[in] env LU environment provided by the caller
- * \param[in] osp OSP device
- *
- * \retval the number of the precreated objects
- */
-static inline int osp_objs_precreated(const struct lu_env *env,
- struct osp_device *osp)
-{
- return osp_fid_diff(&osp->opd_pre_last_created_fid,
- &osp->opd_pre_used_fid);
-}
-
-/**
* Check pool of precreated objects is nearly empty
*
* We should not wait till the pool of the precreated objects is exhausted,
RETURN(rc);
}
- LASSERT(d->opd_pre->osp_pre_delorphan_sent != 0);
spin_lock(&d->opd_pre_lock);
if (d->opd_pre_create_count > d->opd_pre_max_create_count / 2)
d->opd_pre_create_count = d->opd_pre_max_create_count / 2;
{
struct osp_thread_info *osi = osp_env_info(env);
struct lu_fid *last_fid = &osi->osi_fid;
+ struct lu_fid tmp;
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
struct ost_body *body;
int update_status = 0;
int rc;
int diff;
- struct lu_fid fid;
ENTRY;
/*
- * wait for local recovery to finish, so we can cleanup orphans.
- * orphans are all objects since "last used" (assigned).
- * consider reserved objects as created otherwise we can get into
- * a livelock when one blocked thread holding a reservation can
- * block recovery. see LU-8367 for the details. in some cases this
- * can result in gaps (i.e. leaked objects), but we've got LFSCK...
- *
- * do not allow new reservations because they may end up getting
- * orphans being cleaned up below. so we block new reservations.
+ * wait for local recovery to finish, so we can cleanup orphans
+ * orphans are all objects since "last used" (assigned). we do not
+ * block waiting for all reservations as this can lead to a deadlock
+ * see LU-8972 for the details.
*/
spin_lock(&d->opd_pre_lock);
d->opd_pre_recovering = 1;
spin_unlock(&d->opd_pre_lock);
- /*
- * The locking above makes sure the opd_pre_reserved check below will
- * catch all osp_precreate_reserve() calls who find
- * "!opd_pre_recovering".
- */
+
l_wait_event(d->opd_pre_waitq, d->opd_recovery_completed ||
!osp_precreate_running(d) || d->opd_got_disconnected,
&lwi);
if (!osp_precreate_running(d) || d->opd_got_disconnected)
GOTO(out, rc = -EAGAIN);
+ CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
+ d->opd_obd->obd_name, PFID(&d->opd_last_used_fid));
+
*last_fid = d->opd_last_used_fid;
/* The OSP should already get the valid seq now */
LASSERT(!fid_is_zero(last_fid));
if (fid_oid(&d->opd_last_used_fid) < 2) {
/* lastfid looks strange... ask OST */
+ LCONSOLE_WARN("%s: refresh last id\n", d->opd_obd->obd_name);
rc = osp_get_lastfid_from_ost(env, d);
if (rc)
GOTO(out, rc);
if (body == NULL)
GOTO(out, rc = -EPROTO);
- body->oa.o_flags = 0;
+ body->oa.o_flags = OBD_FL_DELORPHAN;
body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
- /* unless this is the very first DELORPHAN (when we really
- * can destroy some orphans), just tell OST to recreate
- * missing objects in our precreate pool */
+ /* cleanup objects upto used+reserved as we do not
+ * want to block the orphan cleanup procedure */
spin_lock(&d->opd_pre_lock);
- if (d->opd_pre->osp_pre_delorphan_sent) {
- fid = d->opd_pre_last_created_fid;
+ if (fid_seq(&d->opd_pre_used_fid) != 0) {
+ tmp = d->opd_pre_used_fid;
+ tmp.f_oid += d->opd_pre_reserved;
+ /* shrink current precreate window to let reserved
+ * already objects be created and block new
+ * precreations */
+ d->opd_pre_last_created_fid = tmp;
} else {
- fid = d->opd_last_used_fid;
- body->oa.o_flags = OBD_FL_DELORPHAN;
+ tmp = d->opd_last_used_fid;
}
+ fid_to_ostid(&tmp, &body->oa.o_oi);
spin_unlock(&d->opd_pre_lock);
- fid_to_ostid(&fid, &body->oa.o_oi);
-
- CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
- d->opd_obd->obd_name, PFID(&fid));
ptlrpc_request_set_replen(req);
if (body == NULL)
GOTO(out, rc = -EPROTO);
- /*
- * OST provides us with id new pool starts from in body->oa.o_id
- */
+ /* OST provides us with id new pool starts from in body->oa.o_id */
ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
- spin_lock(&d->opd_pre_lock);
- diff = osp_fid_diff(&fid, last_fid);
- if (diff > 0) {
- d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
- d->opd_pre_last_created_fid = *last_fid;
- } else {
- d->opd_pre_create_count = OST_MIN_PRECREATE;
- d->opd_pre_last_created_fid = *last_fid;
- }
- /*
- * This empties the pre-creation pool and effectively blocks any new
- * reservations.
- */
- LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
- LUSTRE_DATA_SEQ_MAX_WIDTH);
- if (d->opd_pre->osp_pre_delorphan_sent == 0)
- d->opd_pre_used_fid = d->opd_pre_last_created_fid;
- d->opd_pre_create_slow = 0;
- spin_unlock(&d->opd_pre_lock);
- d->opd_pre->osp_pre_delorphan_sent = 1;
-
CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
"last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
} else {
wake_up(&d->opd_pre_user_waitq);
}
+ GOTO(ret, rc);
+ }
+
+ spin_lock(&d->opd_pre_lock);
+ d->opd_pre_recovering = 0;
+ spin_unlock(&d->opd_pre_lock);
+
+ /* now we wait until all reserved objects are consumed or released,
+ * so that the window doesn't change. otherwise we can get objects
+ * with wrong FIDs */
+ l_wait_event(d->opd_pre_waitq, d->opd_pre_reserved == 0 ||
+ !osp_precreate_running(d) || d->opd_got_disconnected, &lwi);
+ if (!osp_precreate_running(d))
+ GOTO(ret, rc = 0);
+
+ spin_lock(&d->opd_pre_lock);
+ diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
+ if (diff > 0) {
+ d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
+ d->opd_pre_last_created_fid = d->opd_last_used_fid;
} else {
- spin_lock(&d->opd_pre_lock);
- d->opd_pre_recovering = 0;
- spin_unlock(&d->opd_pre_lock);
+ d->opd_pre_create_count = OST_MIN_PRECREATE;
+ d->opd_pre_last_created_fid = *last_fid;
}
+ /*
+ * This empties the pre-creation pool and effectively blocks any new
+ * reservations.
+ */
+ LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
+ LUSTRE_DATA_SEQ_MAX_WIDTH);
+ d->opd_pre_used_fid = d->opd_pre_last_created_fid;
+ d->opd_pre_create_slow = 0;
+ spin_unlock(&d->opd_pre_lock);
+ret:
RETURN(rc);
}
d->opd_pre_status, rc);
CDEBUG(D_INFO,
"non-committed changes: %u, in progress: %u\n",
- atomic_read(&d->opd_syn_changes),
- atomic_read(&d->opd_syn_rpc_in_progress));
+ atomic_read(&d->opd_sync_changes),
+ atomic_read(&d->opd_sync_rpcs_in_progress));
} else if (unlikely(old == -ENOSPC)) {
d->opd_pre_status = 0;
spin_lock(&d->opd_pre_lock);
return 1;
/* ready if OST reported no space and no destroys in progress */
- if (atomic_read(&d->opd_syn_changes) +
- atomic_read(&d->opd_syn_rpc_in_progress) == 0 &&
+ if (atomic_read(&d->opd_sync_changes) +
+ atomic_read(&d->opd_sync_rpcs_in_progress) == 0 &&
d->opd_pre_status == -ENOSPC)
return 1;
struct osp_device *d = data;
CDEBUG(D_HA, "%s: slow creates, last="DFID", next="DFID", "
- "reserved=%llu, syn_changes=%u, "
- "syn_rpc_in_progress=%d, status=%d\n",
+ "reserved=%llu, sync_changes=%u, "
+ "sync_rpcs_in_progress=%d, status=%d\n",
d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
- atomic_read(&d->opd_syn_changes),
- atomic_read(&d->opd_syn_rpc_in_progress),
+ atomic_read(&d->opd_sync_changes),
+ atomic_read(&d->opd_sync_rpcs_in_progress),
d->opd_pre_status);
return 1;
* wait till that is done - some space might be released
*/
if (unlikely(rc == -ENOSPC)) {
- if (atomic_read(&d->opd_syn_changes)) {
+ if (atomic_read(&d->opd_sync_changes)) {
/* force local commit to release space */
dt_commit_async(env, d->opd_storage);
}
- if (atomic_read(&d->opd_syn_rpc_in_progress)) {
+ if (atomic_read(&d->opd_sync_rpcs_in_progress)) {
/* just wait till destroys are done */
/* see l_wait_even() few lines below */
}
- if (atomic_read(&d->opd_syn_changes) +
- atomic_read(&d->opd_syn_rpc_in_progress) == 0) {
+ if (atomic_read(&d->opd_sync_changes) +
+ atomic_read(&d->opd_sync_rpcs_in_progress) == 0) {
/* no hope for free space */
break;
}