}
/**
- * Check given sequence is empty
- *
- * Returns a binary result whether the given sequence has some IDs left
- * or not. Find the details in osp_fid_end_seq(). This is a lock protected
- * version of that function.
- *
- * \param[in] env LU environment provided by the caller
- * \param[in] osp OSP device
- *
- * \retval 0 - current sequence has no IDs, 1 - otherwise
- */
-static inline int osp_create_end_seq(const struct lu_env *env,
- struct osp_device *osp)
-{
- struct lu_fid *fid = &osp->opd_pre_used_fid;
- int rc;
-
- spin_lock(&osp->opd_pre_lock);
- rc = osp_fid_end_seq(env, fid);
- spin_unlock(&osp->opd_pre_lock);
- return rc;
-}
-
-/**
* Write FID into into last_oid/last_seq file
*
* The function stores the sequence and the in-sequence id into two dedicated
*
* When a current sequence has no available IDs left, OSP has to switch to
* another new sequence. OSP requests it using the regular FLDB protocol
- * and stores synchronously before that is used in precreated. This is needed
+ * and stores synchronously before that is used in precreate. This is needed
* to basically have the sequences referenced (not orphaned), otherwise it's
* possible that OST has some objects precreated and the clients have data
* written to it, but after MDT failover nobody refers those objects and OSP
* has no idea that the sequence need cleanup to be done.
- * While this is very expensive operation, it's supposed to happen very very
- * infrequently because sequence has 2^32 or 2^48 objects (depending on type)
+ * While this is very expensive operation, it's supposed to happen infrequently
+ * because sequence has LUSTRE_DATA_SEQ_MAX_WIDTH=32M objects by default.
*
* \param[in] env LU environment provided by the caller
* \param[in] osp OSP device
RETURN(rc);
}
- LCONSOLE_INFO("%s: update sequence from %#llx to %#llx\n",
- osp->opd_obd->obd_name, fid_seq(last_fid),
- fid_seq(fid));
+ LCONSOLE(D_INFO, "%s: update sequence from %#llx to %#llx\n",
+ osp->opd_obd->obd_name, fid_seq(last_fid),
+ fid_seq(fid));
/* Update last_xxx to the new seq */
spin_lock(&osp->opd_pre_lock);
osp->opd_last_used_fid = *fid;
static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
struct lu_fid *fid, int *grow)
{
- struct osp_thread_info *osi = osp_env_info(env);
- __u64 end;
- int i = 0;
+ struct osp_thread_info *osi = osp_env_info(env);
+ __u64 seq_width = osp->opd_pre_seq_width;
+ __u64 end;
+ int i = 0;
if (fid_is_idif(fid)) {
struct lu_fid *last_fid;
spin_lock(&osp->opd_pre_lock);
last_fid = &osp->opd_pre_last_created_fid;
fid_to_ostid(last_fid, oi);
- end = min(ostid_id(oi) + *grow, IDIF_MAX_OID);
+ end = min(ostid_id(oi) + *grow, min(IDIF_MAX_OID, seq_width));
*grow = end - ostid_id(oi);
rc = ostid_set_id(oi, ostid_id(oi) + *grow);
spin_unlock(&osp->opd_pre_lock);
spin_lock(&osp->opd_pre_lock);
*fid = osp->opd_pre_last_created_fid;
end = fid->f_oid;
- end = min((end + *grow), (__u64)LUSTRE_DATA_SEQ_MAX_WIDTH);
+ end = min((end + *grow), min(OBIF_MAX_OID, seq_width));
*grow = end - fid->f_oid;
fid->f_oid += end - fid->f_oid;
spin_unlock(&osp->opd_pre_lock);
d->opd_pre_create_slow = 0;
}
+ if ((body->oa.o_valid & OBD_MD_FLSIZE) && body->oa.o_size)
+ d->opd_pre_seq_width = body->oa.o_size;
+
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
fid_to_ostid(fid, &body->oa.o_oi);
* This empties the pre-creation pool and effectively blocks any new
* reservations.
*/
- LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
- LUSTRE_DATA_SEQ_MAX_WIDTH);
+ LASSERTF(fid_oid(&d->opd_pre_last_created_fid) <= IDIF_MAX_OID,
+ "%s: last_created_fid "DFID" > %llu\n",
+ d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
+ IDIF_MAX_OID);
d->opd_pre_used_fid = d->opd_pre_last_created_fid;
d->opd_pre_create_slow = 0;
+ if ((body->oa.o_valid & OBD_MD_FLSIZE) && body->oa.o_size)
+ d->opd_pre_seq_width = body->oa.o_size;
spin_unlock(&d->opd_pre_lock);
CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
while (!kthread_should_stop()) {
wait_event_idle(d->opd_pre_waitq,
kthread_should_stop() ||
- osp_precreate_near_empty(env, d) ||
+ (osp_precreate_near_empty(env, d) &&
+ !(osp_precreate_end_seq(env, d) &&
+ osp_objs_precreated(env, d) != 0)) ||
osp_statfs_need_update(d) ||
d->opd_got_disconnected);
/* To avoid handling different seq in precreate/orphan
* cleanup, it will hold precreate until current seq is
* used up. */
- if (unlikely(osp_precreate_end_seq(env, d) &&
- !osp_create_end_seq(env, d)))
- continue;
-
- if (unlikely(osp_precreate_end_seq(env, d) &&
- osp_create_end_seq(env, d))) {
- rc = osp_precreate_rollover_new_seq(env, d);
- if (rc)
+ if (unlikely(osp_precreate_end_seq(env, d))) {
+ if (osp_objs_precreated(env, d) == 0) {
+ rc = osp_precreate_rollover_new_seq(env, d);
+ if (rc)
+ continue;
+ } else {
continue;
+ }
}
if (osp_precreate_near_empty(env, d)) {
spin_lock(&d->opd_pre_lock);
precreated = osp_objs_precreated(env, d);
- if (precreated > d->opd_pre_reserved &&
- !d->opd_pre_recovering &&
- !d->opd_force_creation) {
- d->opd_pre_reserved++;
- spin_unlock(&d->opd_pre_lock);
- rc = 0;
-
- /* XXX: don't wake up if precreation is in progress */
- if (osp_precreate_near_empty_nolock(env, d) &&
- !osp_precreate_end_seq_nolock(env, d))
- wake_up(&d->opd_pre_waitq);
+ if (!d->opd_pre_recovering && !d->opd_force_creation) {
+ if (precreated > d->opd_pre_reserved) {
+ d->opd_pre_reserved++;
+ spin_unlock(&d->opd_pre_lock);
+ rc = 0;
+
+ /*
+ * XXX: don't wake up if precreation
+ * is in progress
+ */
+ if (osp_precreate_near_empty_nolock(env, d) &&
+ !osp_precreate_end_seq_nolock(env, d))
+ wake_up(&d->opd_pre_waitq);
- break;
+ break;
+ } else if (unlikely(precreated &&
+ osp_precreate_end_seq_nolock(env, d))) {
+ /*
+ * precreate pool is reaching the end of the
+ * current seq, and doesn't have enough objects
+ */
+ rc = -ENOSPC;
+ spin_unlock(&d->opd_pre_lock);
+ break;
+ }
}
spin_unlock(&d->opd_pre_lock);
struct lu_fid *fid)
{
struct lu_fid *pre_used_fid = &d->opd_pre_used_fid;
+
/* grab next id from the pool */
spin_lock(&d->opd_pre_lock);
LASSERTF(osp_fid_diff(&d->opd_pre_used_fid,
&d->opd_pre_last_created_fid) < 0,
- "next fid "DFID" last created fid "DFID"\n",
+ "next fid "DFID" > last created fid "DFID"\n",
PFID(&d->opd_pre_used_fid),
PFID(&d->opd_pre_last_created_fid));
- /*
- * When sequence is used up, new one should be allocated in
- * osp_precreate_rollover_new_seq. So ASSERT here to avoid
- * objid overflow.
+ /* Non-IDIF FIDs shouldn't get here with OID == OBIF_MAX_OID. For IDIF,
+ * f_oid wraps and "f_seq" (holding high 16 bits of ID) needs increment
*/
- LASSERTF(osp_fid_end_seq(env, pre_used_fid) == 0,
- "next fid "DFID" last created fid "DFID"\n",
- PFID(&d->opd_pre_used_fid),
- PFID(&d->opd_pre_last_created_fid));
- /* Non IDIF fids shoulnd't get here with oid == 0xFFFFFFFF. */
if (fid_is_idif(pre_used_fid) &&
- unlikely(fid_oid(pre_used_fid) == LUSTRE_DATA_SEQ_MAX_WIDTH))
- pre_used_fid->f_seq++;
+ unlikely(fid_oid(pre_used_fid) == OBIF_MAX_OID)) {
+ struct ost_id oi;
+ __u32 idx = fid_idif_ost_idx(pre_used_fid);
+
+ fid_to_ostid(pre_used_fid, &oi);
+ oi.oi.oi_id++;
+ ostid_to_fid(pre_used_fid, &oi, idx);
+ } else {
+ pre_used_fid->f_oid++;
+ }
- d->opd_pre_used_fid.f_oid++;
- memcpy(fid, &d->opd_pre_used_fid, sizeof(*fid));
+ memcpy(fid, pre_used_fid, sizeof(*fid));
d->opd_pre_reserved--;
/*
* last_used_id must be changed along with getting new id otherwise
d->opd_pre_last_created_fid.f_oid = 1;
d->opd_last_id = 0;
d->opd_pre_reserved = 0;
+ d->opd_pre_seq_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
d->opd_got_disconnected = 1;
d->opd_pre_create_slow = 0;
d->opd_pre_create_count = OST_MIN_PRECREATE;