/* couldn't update statfs, try again as soon as possible */
cfs_waitq_signal(&d->opd_pre_waitq);
if (req->rq_import_generation == imp->imp_generation)
- CERROR("%s: couldn't update statfs: rc = %d\n",
+ CDEBUG(D_CACHE, "%s: couldn't update statfs: rc = %d\n",
d->opd_obd->obd_name, rc);
RETURN(rc);
}
int rc;
/* XXX: do we really need locking here? */
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
rc = osp_precreate_near_empty_nolock(d);
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
return rc;
}
RETURN(rc);
}
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
if (d->opd_pre_grow_count > d->opd_pre_max_grow_count / 2)
d->opd_pre_grow_count = d->opd_pre_max_grow_count / 2;
grow = d->opd_pre_grow_count;
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
diff = body->oa.o_id - d->opd_pre_last_created;
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
if (diff < grow) {
/* the OST has not managed to create all the
* objects we asked for */
d->opd_pre_grow_slow = 0;
}
d->opd_pre_last_created = body->oa.o_id;
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
CDEBUG(D_OTHER, "current precreated pool: %llu-%llu\n",
d->opd_pre_used_id, d->opd_pre_last_created);
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
struct ost_body *body;
+ struct l_wait_info lwi = { 0 };
+ int update_status = 0;
int rc;
ENTRY;
- LASSERT(d->opd_recovery_completed);
- LASSERT(d->opd_pre_reserved == 0);
+ /*
+ * wait for local recovery to finish, so we can cleanup orphans
+ * orphans are all objects since "last used" (assigned), but
+ * there might be objects reserved and in some cases they won't
+ * be used. we can't cleanup them till we're sure they won't be
+ * used. also can't we allow new reservations because they may
+ * end up getting orphans being cleaned up below. so we block
+ * new reservations and wait till all reserved objects either
+ * user or released.
+ */
+ spin_lock(&d->opd_pre_lock);
+ d->opd_pre_recovering = 1;
+ spin_unlock(&d->opd_pre_lock);
+ /*
+ * The locking above makes sure the opd_pre_reserved check below will
+ * catch all osp_precreate_reserve() calls who find
+ * "!opd_pre_recovering".
+ */
+ l_wait_event(d->opd_pre_waitq,
+ (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+ !osp_precreate_running(d) || d->opd_got_disconnected,
+ &lwi);
+ if (!osp_precreate_running(d) || d->opd_got_disconnected)
+ GOTO(out, rc = -EAGAIN);
CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
d->opd_obd->obd_name, d->opd_last_used_id);
req->rq_no_resend = req->rq_no_delay = 1;
rc = ptlrpc_queue_wait(req);
- if (rc)
+ if (rc) {
+ update_status = 1;
GOTO(out, rc);
+ }
body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
/*
* OST provides us with id new pool starts from in body->oa.o_id
*/
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
if (le64_to_cpu(d->opd_last_used_id) > body->oa.o_id) {
d->opd_pre_grow_count = OST_MIN_PRECREATE +
le64_to_cpu(d->opd_last_used_id) -
body->oa.o_id;
- d->opd_pre_last_created = le64_to_cpu(d->opd_last_used_id) + 1;
+ d->opd_pre_last_created = le64_to_cpu(d->opd_last_used_id);
} else {
d->opd_pre_grow_count = OST_MIN_PRECREATE;
- d->opd_pre_last_created = body->oa.o_id + 1;
+ d->opd_pre_last_created = body->oa.o_id;
}
- d->opd_pre_used_id = d->opd_pre_last_created - 1;
+ /*
+ * This empties the pre-creation pool and effectively blocks any new
+ * reservations.
+ */
+ d->opd_pre_used_id = d->opd_pre_last_created;
d->opd_pre_grow_slow = 0;
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
CDEBUG(D_HA, "%s: Got last_id "LPU64" from OST, last_used is "LPU64
", pre_used "LPU64"\n", d->opd_obd->obd_name, body->oa.o_id,
if (req)
ptlrpc_req_finished(req);
+ d->opd_pre_recovering = 0;
+
+ /*
+ * If rc is zero, the pre-creation window should have been emptied.
+ * Since waking up the herd would be useless without pre-created
+ * objects, we defer the signal to osp_precreate_send() in that case.
+ */
+ if (rc != 0) {
+ if (update_status) {
+ CERROR("%s: cannot cleanup orphans: rc = %d\n",
+ d->opd_obd->obd_name, rc);
+ /* we can't proceed from here, OST seem to
+ * be in a bad shape, better to wait for
+ * a new instance of the server and repeat
+ * from the beginning. notify possible waiters
+ * this OSP isn't quite functional yet */
+ osp_pre_update_status(d, rc);
+ } else {
+ cfs_waitq_signal(&d->opd_pre_user_waitq);
+ }
+ }
+
RETURN(rc);
}
sprintf(pname, "osp-pre-%u\n", d->opd_index);
cfs_daemonize(pname);
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
thread->t_flags = SVC_RUNNING;
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
cfs_waitq_signal(&thread->t_ctl_waitq);
while (osp_precreate_running(d)) {
osp_statfs_update(d);
/*
- * wait for local recovery to finish, so we can cleanup orphans
- * orphans are all objects since "last used" (assigned), but
- * there might be objects reserved and in some cases they won't
- * be used. we can't cleanup them till we're sure they won't be
- * used. so we block new reservations and wait till all reserved
- * objects either user or released.
+ * Clean up orphans or recreate missing objects.
*/
- l_wait_event(d->opd_pre_waitq, (!d->opd_pre_reserved &&
- d->opd_recovery_completed) ||
- !osp_precreate_running(d) ||
- d->opd_got_disconnected, &lwi);
-
- if (osp_precreate_running(d) && !d->opd_got_disconnected) {
- rc = osp_precreate_cleanup_orphans(d);
- if (rc) {
- CERROR("%s: cannot cleanup orphans: rc = %d\n",
- d->opd_obd->obd_name, rc);
- /* we can't proceed from here, OST seem to
- * be in a bad shape, better to wait for
- * a new instance of the server and repeat
- * from the beginning. notify possible waiters
- * this OSP isn't quite functional yet */
- osp_pre_update_status(d, rc);
- cfs_waitq_signal(&d->opd_pre_user_waitq);
- l_wait_event(d->opd_pre_waitq,
- !osp_precreate_running(d) ||
- d->opd_new_connection, &lwi);
- continue;
-
- }
- }
+ rc = osp_precreate_cleanup_orphans(d);
+ if (rc != 0)
+ continue;
/*
* connected, can handle precreates now
{
__u64 next;
+ if (d->opd_pre_recovering)
+ return 0;
+
/* ready if got enough precreated objects */
/* we need to wait for others (opd_pre_reserved) and our object (+1) */
next = d->opd_pre_used_id + d->opd_pre_reserved + 1;
d->opd_pre_grow_slow == 0 &&
(d->opd_pre_last_created - d->opd_pre_used_id <=
d->opd_pre_grow_count / 4 + 1)) {
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
d->opd_pre_grow_slow = 1;
d->opd_pre_grow_count *= 2;
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
}
- /*
- * we never use the last object in the window
- */
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
precreated = d->opd_pre_last_created - d->opd_pre_used_id;
- if (precreated > d->opd_pre_reserved) {
+ if (precreated > d->opd_pre_reserved &&
+ !d->opd_pre_recovering) {
d->opd_pre_reserved++;
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
rc = 0;
/* XXX: don't wake up if precreation is in progress */
break;
}
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
/*
* all precreated objects have been used and no-space
obd_id objid;
/* grab next id from the pool */
- cfs_spin_lock(&d->opd_pre_lock);
+ spin_lock(&d->opd_pre_lock);
LASSERT(d->opd_pre_used_id < d->opd_pre_last_created);
objid = ++d->opd_pre_used_id;
d->opd_pre_reserved--;
* we might miscalculate gap causing object loss or leak
*/
osp_update_last_id(d, objid);
- cfs_spin_unlock(&d->opd_pre_lock);
+ spin_unlock(&d->opd_pre_lock);
/*
* probably main thread suspended orphan cleanup till
d->opd_pre_min_grow_count = OST_MIN_PRECREATE;
d->opd_pre_max_grow_count = OST_MAX_PRECREATE;
- cfs_spin_lock_init(&d->opd_pre_lock);
+ spin_lock_init(&d->opd_pre_lock);
cfs_waitq_init(&d->opd_pre_waitq);
cfs_waitq_init(&d->opd_pre_user_waitq);
cfs_waitq_init(&d->opd_pre_thread.t_ctl_waitq);