struct ptlrpc_request *req = NULL;
struct obd_import *imp;
struct ost_body *body;
+ struct l_wait_info lwi = { 0 };
+ int update_status = 0;
int rc;
ENTRY;
- LASSERT(d->opd_recovery_completed);
- LASSERT(d->opd_pre_reserved == 0);
+ /*
+ * wait for local recovery to finish, so we can cleanup orphans
+ * orphans are all objects since "last used" (assigned), but
+ * there might be objects reserved and in some cases they won't
+ * be used. we can't cleanup them till we're sure they won't be
+ * used. also can't we allow new reservations because they may
+ * end up getting orphans being cleaned up below. so we block
+ * new reservations and wait till all reserved objects either
+ * user or released.
+ */
+ spin_lock(&d->opd_pre_lock);
+ d->opd_pre_recovering = 1;
+ spin_unlock(&d->opd_pre_lock);
+ /*
+ * The locking above makes sure the opd_pre_reserved check below will
+ * catch all osp_precreate_reserve() calls who find
+ * "!opd_pre_recovering".
+ */
+ l_wait_event(d->opd_pre_waitq,
+ (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+ !osp_precreate_running(d) || d->opd_got_disconnected,
+ &lwi);
+ if (!osp_precreate_running(d) || d->opd_got_disconnected)
+ GOTO(out, rc = -EAGAIN);
CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
d->opd_obd->obd_name, d->opd_last_used_id);
req->rq_no_resend = req->rq_no_delay = 1;
rc = ptlrpc_queue_wait(req);
- if (rc)
+ if (rc) {
+ update_status = 1;
GOTO(out, rc);
+ }
body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
d->opd_pre_grow_count = OST_MIN_PRECREATE;
d->opd_pre_last_created = body->oa.o_id;
}
+ /*
+ * This empties the pre-creation pool and effectively blocks any new
+ * reservations.
+ */
d->opd_pre_used_id = d->opd_pre_last_created;
d->opd_pre_grow_slow = 0;
spin_unlock(&d->opd_pre_lock);
if (req)
ptlrpc_req_finished(req);
+ d->opd_pre_recovering = 0;
+
+ /*
+ * If rc is zero, the pre-creation window should have been emptied.
+ * Since waking up the herd would be useless without pre-created
+ * objects, we defer the signal to osp_precreate_send() in that case.
+ */
+ if (rc != 0) {
+ if (update_status) {
+ CERROR("%s: cannot cleanup orphans: rc = %d\n",
+ d->opd_obd->obd_name, rc);
+ /* we can't proceed from here, OST seem to
+ * be in a bad shape, better to wait for
+ * a new instance of the server and repeat
+ * from the beginning. notify possible waiters
+ * this OSP isn't quite functional yet */
+ osp_pre_update_status(d, rc);
+ } else {
+ cfs_waitq_signal(&d->opd_pre_user_waitq);
+ }
+ }
+
RETURN(rc);
}
osp_statfs_update(d);
/*
- * wait for local recovery to finish, so we can cleanup orphans
- * orphans are all objects since "last used" (assigned), but
- * there might be objects reserved and in some cases they won't
- * be used. we can't cleanup them till we're sure they won't be
- * used. so we block new reservations and wait till all reserved
- * objects either user or released.
+ * Clean up orphans or recreate missing objects.
*/
- l_wait_event(d->opd_pre_waitq, (!d->opd_pre_reserved &&
- d->opd_recovery_completed) ||
- !osp_precreate_running(d) ||
- d->opd_got_disconnected, &lwi);
-
- if (osp_precreate_running(d) && !d->opd_got_disconnected) {
- rc = osp_precreate_cleanup_orphans(d);
- if (rc) {
- CERROR("%s: cannot cleanup orphans: rc = %d\n",
- d->opd_obd->obd_name, rc);
- /* we can't proceed from here, OST seem to
- * be in a bad shape, better to wait for
- * a new instance of the server and repeat
- * from the beginning. notify possible waiters
- * this OSP isn't quite functional yet */
- osp_pre_update_status(d, rc);
- cfs_waitq_signal(&d->opd_pre_user_waitq);
- l_wait_event(d->opd_pre_waitq,
- !osp_precreate_running(d) ||
- d->opd_new_connection, &lwi);
- continue;
-
- }
- }
+ rc = osp_precreate_cleanup_orphans(d);
+ if (rc != 0)
+ continue;
/*
* connected, can handle precreates now
{
__u64 next;
+ if (d->opd_pre_recovering)
+ return 0;
+
/* ready if got enough precreated objects */
/* we need to wait for others (opd_pre_reserved) and our object (+1) */
next = d->opd_pre_used_id + d->opd_pre_reserved + 1;
spin_lock(&d->opd_pre_lock);
precreated = d->opd_pre_last_created - d->opd_pre_used_id;
- if (precreated > d->opd_pre_reserved) {
+ if (precreated > d->opd_pre_reserved &&
+ !d->opd_pre_recovering) {
d->opd_pre_reserved++;
spin_unlock(&d->opd_pre_lock);
rc = 0;