#include <linux/obd_class.h>
#include "osc_internal.h"
-static int osc_interpret_create(struct ptlrpc_request *req, void *data,
- int rc)
+static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
{
struct osc_creator *oscc;
struct ost_body *body = NULL;
}
oscc = req->rq_async_args.pointer_arg[0];
+ LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
+
spin_lock(&oscc->oscc_lock);
- if (body)
- oscc->oscc_last_id = body->oa.o_id;
- if (rc == -ENOSPC) {
- DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
+ oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+ if (rc == -ENOSPC || rc == -EROFS) {
oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+ if (body && rc == -ENOSPC) {
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
} else if (rc != 0 && rc != -EIO) {
- DEBUG_REQ(D_ERROR, req,
- "unknown rc %d from async create: failing oscc",
- rc);
oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_ERROR, req,
+ "unknown rc %d from async create: failing oscc", rc);
ptlrpc_fail_import(req->rq_import, req->rq_import_generation);
+ } else {
+ if (rc == 0) {
+ oscc->oscc_flags &= ~OSCC_FLAG_LOW;
+ if (body) {
+ int diff = body->oa.o_id - oscc->oscc_last_id;
+ if (diff != oscc->oscc_grow_count)
+ oscc->oscc_grow_count =
+ max(diff/3, OST_MIN_PRECREATE);
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ }
+ spin_unlock(&oscc->oscc_lock);
}
- oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
- spin_unlock(&oscc->oscc_lock);
- CDEBUG(D_INFO, "preallocated through id "LPU64" (last used "LPU64")\n",
+ CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n",
oscc->oscc_last_id, oscc->oscc_next_id);
wake_up(&oscc->oscc_waitq);
ENTRY;
spin_lock(&oscc->oscc_lock);
- if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
+ if (oscc->oscc_grow_count < OST_MAX_PRECREATE &&
+ !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) &&
+ (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
+ (oscc->oscc_grow_count / 4 + 1)) {
+ oscc->oscc_flags |= OSCC_FLAG_LOW;
+ oscc->oscc_grow_count *= 2;
+ }
+
+ if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2)
+ oscc->oscc_grow_count = OST_MAX_PRECREATE / 2;
+
+ if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
+ oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
spin_unlock(&oscc->oscc_lock);
RETURN(0);
}
spin_lock(&oscc->oscc_lock);
body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
body->oa.o_valid |= OBD_MD_FLID;
- CDEBUG(D_INFO, "preallocating through id "LPU64" (last used "LPU64")\n",
- body->oa.o_id, oscc->oscc_next_id);
spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_HA, "preallocating through id "LPU64" (last used "LPU64")\n",
+ body->oa.o_id, oscc->oscc_next_id);
request->rq_replen = lustre_msg_size(1, &size);
spin_unlock(&oscc->oscc_lock);
osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
-
+
return have_objs || ost_full || osc_invalid;
}
int rc = 0;
ENTRY;
- if (oscc_has_objects(oscc, oscc->oscc_kick_barrier))
+ if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
RETURN(0);
if (!wait)
RETURN(rc);
}
-int oscc_recovering(struct osc_creator *oscc)
+int oscc_recovering(struct osc_creator *oscc)
{
int recov = 0;
RETURN(osc_real_create(exp, oa, ea, oti));
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
- oa->o_flags == OBD_FL_RECREATE_OBJS) {
+ oa->o_flags == OBD_FL_RECREATE_OBJS) {
RETURN(osc_real_create(exp, oa, ea, oti));
}
- lsm = *ea;
- if (lsm == NULL) {
- rc = obd_alloc_memmd(exp, &lsm);
- if (rc < 0)
- RETURN(rc);
- }
+ /* this is the special case where create removes orphans */
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ oa->o_flags == OBD_FL_DELORPHAN) {
+ spin_lock(&oscc->oscc_lock);
+ if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {
+ spin_unlock(&oscc->oscc_lock);
+ return -EBUSY;
+ }
+ if (!(oscc->oscc_flags & OSCC_FLAG_RECOVERING)) {
+ spin_unlock(&oscc->oscc_lock);
+ return 0;
+ }
+ oscc->oscc_flags |= OSCC_FLAG_SYNC_IN_PROGRESS;
+ spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_HA, "%s: oscc recovery started\n",
+ oscc->oscc_obd->obd_name);
- /* this is the special case where create removes orphans */
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
- oa->o_flags == OBD_FL_DELORPHAN) {
- CDEBUG(D_HA, "%p: oscc recovery started\n", oscc);
/* delete from next_id on up */
oa->o_valid |= OBD_MD_FLID;
oa->o_id = oscc->oscc_next_id - 1;
+ CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n",
+ oscc->oscc_obd->obd_name, oa->o_id);
+
rc = osc_real_create(exp, oa, ea, NULL);
spin_lock(&oscc->oscc_lock);
- if (rc == -ENOSPC)
- oscc->oscc_flags |= OSCC_FLAG_NOSPC;
- oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
- oscc->oscc_last_id = oa->o_id;
- wake_up(&oscc->oscc_waitq);
+ oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;
+ if (rc == 0 || rc == -ENOSPC) {
+ if (rc == -ENOSPC)
+ oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+ oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+ oscc->oscc_last_id = oa->o_id;
+ CDEBUG(D_HA, "%s: oscc recovery finished: %d\n",
+ oscc->oscc_obd->obd_name, rc);
+ wake_up(&oscc->oscc_waitq);
+ } else {
+ CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",
+ oscc->oscc_obd->obd_name, rc);
+ }
spin_unlock(&oscc->oscc_lock);
- CDEBUG(D_HA, "%p: oscc recovery finished\n", oscc);
-
- RETURN(rc);
- }
-
- /* If orphans are being recovered, then we must wait until it is
- finished before we can continue with create. */
- if (oscc_recovering(oscc)) {
- struct l_wait_info lwi;
- CDEBUG(D_HA, "%p: oscc recovery in progress, waiting\n", oscc);
+ RETURN(rc);
+ }
- lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
- rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc),
- &lwi);
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
- if (rc == -ETIMEDOUT) {
- CDEBUG(D_HA, "%p: timed out waiting for recovery\n", oscc);
+ lsm = *ea;
+ if (lsm == NULL) {
+ rc = obd_alloc_memmd(exp, &lsm);
+ if (rc < 0)
RETURN(rc);
- }
- CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc);
}
-
-
+
while (try_again) {
+ /* If orphans are being recovered, then we must wait until
+ it is finished before we can continue with create. */
+ if (oscc_recovering(oscc)) {
+ struct l_wait_info lwi;
+
+ CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n",
+ oscc);
+
+ lwi = LWI_TIMEOUT(MAX(obd_timeout*HZ/4, 1), NULL, NULL);
+ rc = l_wait_event(oscc->oscc_waitq,
+ !oscc_recovering(oscc), &lwi);
+ LASSERT(rc == 0 || rc == -ETIMEDOUT);
+ if (rc == -ETIMEDOUT) {
+ CDEBUG(D_HA,"%p: timeout waiting on recovery\n",
+ oscc);
+ RETURN(rc);
+ }
+ CDEBUG(D_HA, "%p: oscc recovery over, waking up\n",
+ oscc);
+ }
+
spin_lock(&oscc->oscc_lock);
+ if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+
if (oscc->oscc_last_id >= oscc->oscc_next_id) {
memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
oa->o_id = oscc->oscc_next_id;
}
spin_unlock(&oscc->oscc_lock);
rc = oscc_precreate(oscc, try_again);
- if (rc == -EIO)
+ if (rc)
break;
}
if (rc == 0)
- CDEBUG(D_INFO, "returning objid "LPU64"\n", lsm->lsm_object_id);
+ CDEBUG(D_HA, "%s: returning objid "LPU64"\n",
+ oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid,
+ lsm->lsm_object_id);
else if (*ea == NULL)
obd_free_memmd(exp, &lsm);
RETURN(rc);
init_waitqueue_head(&oscc->oscc_waitq);
spin_lock_init(&oscc->oscc_lock);
oscc->oscc_obd = obd;
- oscc->oscc_kick_barrier = 100;
- oscc->oscc_grow_count = 2000;
- oscc->oscc_initial_create_count = 2000;
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
oscc->oscc_next_id = 2;
oscc->oscc_last_id = 1;