DEBUG_REQ(D_ERROR, req,
"unknown rc %d from async create: failing oscc",
rc);
+ oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
ptlrpc_fail_import(req->rq_import, req->rq_import_generation);
}
oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
RETURN(rc);
}
+int oscc_recovering(struct osc_creator *oscc)
+{
+ int recov = 0;
+
+ spin_lock(&oscc->oscc_lock);
+ recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
+ spin_unlock(&oscc->oscc_lock);
+
+ return recov;
+}
+
int osc_create(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
/* this is the special case where create removes orphans */
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
oa->o_flags == OBD_FL_DELORPHAN) {
+ CDEBUG(D_HA, "%p: oscc recovery started\n", oscc);
/* delete from next_id on up */
oa->o_valid |= OBD_MD_FLID;
oa->o_id = oscc->oscc_next_id - 1;
spin_lock(&oscc->oscc_lock);
if (rc == -ENOSPC)
oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+ oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
oscc->oscc_last_id = oa->o_id;
+ wake_up(&oscc->oscc_waitq);
spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_HA, "%p: oscc recovery finished\n", oscc);
+
RETURN(rc);
}
+ /* If orphans are being recovered, then we must wait until it is
+ finished before we can continue with create. */
+ if (oscc_recovering(oscc)) {
+ struct l_wait_info lwi;
+
+ CDEBUG(D_HA, "%p: oscc recovery in progress, waiting\n", oscc);
+
+ lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
+ rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc),
+ &lwi);
+ LASSERT(rc == 0 || rc == -ETIMEDOUT);
+ if (rc == -ETIMEDOUT)
+ RETURN(rc);
+ CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc);
+ }
+
+
while (try_again) {
spin_lock(&oscc->oscc_lock);
if (oscc->oscc_last_id >= oscc->oscc_next_id) {
oed->oed_oscc.oscc_next_id = 2;
oed->oed_oscc.oscc_last_id = 1;
+ oed->oed_oscc.oscc_flags |= OSCC_FLAG_RECOVERING;
/* XXX the export handle should give the oscc the last object */
/* oed->oed_oscc.oscc_last_id = exph->....; */
}