* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
ENTRY;
if (req->rq_repmsg) {
- body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL && rc == 0)
rc = -EPROTO;
}
oscc = req->rq_async_args.pointer_arg[0];
LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
switch (rc) {
case 0: {
if (body) {
- int diff = body->oa.o_id - oscc->oscc_last_id;
+ int diff =ostid_id(&body->oa.o_oi)- oscc->oscc_last_id;
/* oscc_internal_create() stores the original value of
* grow_count in rq_async_args.space[0].
* next time if needed */
oscc->oscc_flags &= ~OSCC_FLAG_LOW;
}
- oscc->oscc_last_id = body->oa.o_id;
+ oscc->oscc_last_id = ostid_id(&body->oa.o_oi);
}
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
break;
}
case -EROFS:
oscc->oscc_flags |= OSCC_FLAG_RDONLY;
case -ENOSPC:
case -EFBIG:
- if (rc != EROFS) {
+ if (rc != -EROFS) {
oscc->oscc_flags |= OSCC_FLAG_NOSPC;
if (body && rc == -ENOSPC) {
oscc->oscc_last_id = body->oa.o_id;
oscc->oscc_grow_count = OST_MIN_PRECREATE;
}
}
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
break;
case -EIO: {
* of filter (see filter_handle_precreate for detail)*/
if (body && body->oa.o_id > oscc->oscc_last_id)
oscc->oscc_last_id = body->oa.o_id;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
break;
}
+ case -EINTR:
case -EWOULDBLOCK: {
/* aka EAGAIN we should not delay create if import failed -
* this avoid client stick in create and avoid race with
* delorphan */
+ /* EINTR say - old create request is killed due mds<>ost
+ * eviction - OSCC_FLAG_RECOVERING can already set due
+ * IMP_DISCONN event */
oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
/* oscc->oscc_grow_count = OST_MIN_PRECREATE; */
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
break;
}
default: {
oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
oscc->oscc_grow_count = OST_MIN_PRECREATE;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
DEBUG_REQ(D_ERROR, req,
"Unknown rc %d from async create: failing oscc", rc);
ptlrpc_fail_import(req->rq_import,
CDEBUG(D_HA, "preallocated through id "LPU64" (next to use "LPU64")\n",
oscc->oscc_last_id, oscc->oscc_next_id);
- spin_lock(&oscc->oscc_lock);
- list_for_each_entry_safe(fake_req, pos,
- &oscc->oscc_wait_create_list, rq_list) {
+ cfs_spin_lock(&oscc->oscc_lock);
+ cfs_list_for_each_entry_safe(fake_req, pos,
+ &oscc->oscc_wait_create_list, rq_list) {
if (handle_async_create(fake_req, rc) == -EAGAIN) {
oscc_internal_create(oscc);
/* sending request should be never fail because
GOTO(exit_wakeup, rc);
}
}
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
exit_wakeup:
cfs_waitq_signal(&oscc->oscc_waitq);
{
struct ptlrpc_request *request;
struct ost_body *body;
- __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };
ENTRY;
LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
- if(oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
- spin_unlock(&oscc->oscc_lock);
+ /* Do not check for a degraded OST here - bug21563/bug18539 */
+ if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+ cfs_spin_unlock(&oscc->oscc_lock);
RETURN(0);
}
}
if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
RETURN(0);
}
oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
oscc->oscc_flags |= OSCC_FLAG_CREATING;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
- request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
- LUSTRE_OST_VERSION, OST_CREATE, 2,
- size, NULL);
+ request = ptlrpc_request_alloc_pack(oscc->oscc_obd->u.cli.cl_import,
+ &RQF_OST_CREATE,
+ LUSTRE_OST_VERSION, OST_CREATE);
if (request == NULL) {
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
RETURN(-ENOMEM);
}
request->rq_request_portal = OST_CREATE_PORTAL;
ptlrpc_at_set_req_timeout(request);
- body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ body = req_capsule_client_get(&request->rq_pill, &RMF_OST_BODY);
+
+ cfs_spin_lock(&oscc->oscc_lock);
+
+ if (likely(fid_seq_is_mdt(oscc->oscc_oa.o_seq))) {
+ body->oa.o_oi.oi_seq = oscc->oscc_oa.o_seq;
+ body->oa.o_oi.oi_id = oscc->oscc_last_id +
+ oscc->oscc_grow_count;
+ } else {
+ /*Just warning here currently, since not sure how fid-on-ost
+ *will be implemented here */
+ CWARN("o_seq: "LPU64" is not indicate any MDTs.\n",
+ oscc->oscc_oa.o_seq);
+ }
- spin_lock(&oscc->oscc_lock);
- body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
- body->oa.o_gr = oscc->oscc_oa.o_gr;
- LASSERT_MDS_GROUP(body->oa.o_gr);
body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
request->rq_async_args.space[0] = oscc->oscc_grow_count;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
body->oa.o_id, oscc->oscc_last_id);
/* we should not resend create request - anyway we will have delorphan
* and kill these objects */
request->rq_no_delay = request->rq_no_resend = 1;
- ptlrpc_req_set_repsize(request, 2, size);
+ ptlrpc_request_set_replen(request);
request->rq_async_args.pointer_arg[0] = oscc;
request->rq_interpret_reply = osc_interpret_create;
{
int have_objs;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
have_objs = oscc_has_objects_nolock(oscc, count);
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
return have_objs;
}
ost_unusable = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
ost_unusable |= (OSCC_FLAG_NOSPC | OSCC_FLAG_RDONLY |
OSCC_FLAG_EXITING) & oscc->oscc_flags;
have_objs = oscc_has_objects_nolock(oscc, count);
- if (!ost_unusable)
+ if (!ost_unusable && !have_objs)
/* they release lock himself */
- oscc_internal_create(oscc);
+ have_objs = oscc_internal_create(oscc);
else
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
return have_objs || ost_unusable;
}
NULL, NULL);
rc = l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
-
- if (!oscc_has_objects(oscc, 1) || (oscc->oscc_flags & OSCC_FLAG_NOSPC))
- rc = -ENOSPC;
-
- if (oscc->oscc_flags & OSCC_FLAG_RDONLY)
- rc = -EROFS;
-
- if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
- rc = -EIO;
-
RETURN(rc);
}
{
int sync;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
sync = oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
return sync;
}
/* decide if the OST has remaining object, return value :
- 0 : the OST has remaining object, and don't need to do precreate.
- 1 : the OST has no remaining object, and will send a RPC for precreate.
+ 0 : the OST has remaining objects, may or may not send precreation RPC.
+ 1 : the OST has no remaining object, and the sent precreation RPC
+ has not been completed yet.
2 : the OST has no remaining object, and will not get any for
a potentially very long time
1000 : unusable
{
struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
struct obd_import *imp = exp->exp_imp_reverse;
+ int rc;
ENTRY;
LASSERT(oscc != NULL);
if (imp != NULL && imp->imp_deactive)
- RETURN(1000);
+ GOTO(out_nolock, rc = 1000);
- /* until oscc in recovery - other flags is wrong */
- spin_lock(&oscc->oscc_lock);
+ /* Handle critical states first */
+ cfs_spin_lock(&oscc->oscc_lock);
if (oscc->oscc_flags & OSCC_FLAG_NOSPC ||
- oscc->oscc_flags & OSCC_FLAG_RDONLY) {
- spin_unlock(&oscc->oscc_lock);
- RETURN(1000);
- }
+ oscc->oscc_flags & OSCC_FLAG_RDONLY ||
+ oscc->oscc_flags & OSCC_FLAG_EXITING)
+ GOTO(out, rc = 1000);
- if (oscc->oscc_flags & OSCC_FLAG_RECOVERING ||
- oscc->oscc_flags & OSCC_FLAG_DEGRADED) {
- spin_unlock(&oscc->oscc_lock);
- RETURN(2);
- }
+ if ((oscc->oscc_flags & OSCC_FLAG_RECOVERING) ||
+ (oscc->oscc_flags & OSCC_FLAG_DEGRADED))
+ GOTO(out, rc = 2);
- if (oscc_has_objects_nolock(oscc, oscc->oscc_grow_count / 2)) {
- spin_unlock(&oscc->oscc_lock);
- RETURN(0);
- }
+ if (oscc_has_objects_nolock(oscc, oscc->oscc_grow_count / 2))
+ GOTO(out, rc = 0);
- if ((oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) ||
- (oscc->oscc_flags & OSCC_FLAG_CREATING)) {
- spin_unlock(&oscc->oscc_lock);
- RETURN(1);
- }
+ /* Return 0, if we have at least one object - bug 22884 */
+ rc = oscc_has_objects_nolock(oscc, 1) ? 0 : 1;
+
+ /* Do not check for OSCC_FLAG_CREATING flag here, let
+ * osc_precreate() call oscc_internal_create() and
+ * adjust oscc_grow_count bug21563 */
+ if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS)
+ GOTO(out, rc);
- oscc_internal_create(oscc);
- RETURN(1);
+ if (oscc_internal_create(oscc))
+ GOTO(out_nolock, rc = 1000);
+
+ RETURN(rc);
+out:
+ cfs_spin_unlock(&oscc->oscc_lock);
+out_nolock:
+ return rc;
}
static int handle_async_create(struct ptlrpc_request *req, int rc)
if(rc)
GOTO(out_wake, rc);
- if ((oscc->oscc_flags & OSCC_FLAG_EXITING))
+ /* Handle the critical type errors first.
+ * Should we also test cl_import state as well ? */
+ if (oscc->oscc_flags & OSCC_FLAG_EXITING)
GOTO(out_wake, rc = -EIO);
+ if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
+ GOTO(out_wake, rc = -ENOSPC);
+
+ if (oscc->oscc_flags & OSCC_FLAG_RDONLY)
+ GOTO(out_wake, rc = -EROFS);
+
+ /* should be try wait until recovery finished */
+ if((oscc->oscc_flags & OSCC_FLAG_RECOVERING) ||
+ (oscc->oscc_flags & OSCC_FLAG_DEGRADED))
+ RETURN(-EAGAIN);
+
if (oscc_has_objects_nolock(oscc, 1)) {
memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
oa->o_id = oscc->oscc_next_id;
CDEBUG(D_RPCTRACE, " set oscc_next_id = "LPU64"\n",
oscc->oscc_next_id);
- GOTO(out_wake, rc = 0);
+ GOTO(out_wake, rc = 0);
}
- /* should be try wait until recovery finished */
- if(oscc->oscc_flags & OSCC_FLAG_RECOVERING)
- RETURN(-EAGAIN);
-
- if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
- GOTO(out_wake, rc = -ENOSPC);
-
- if (oscc->oscc_flags & OSCC_FLAG_RDONLY)
- GOTO(out_wake, rc = -EROFS);
-
- /* we not have objects now - continue wait */
+ /* we don't have objects now - continue wait */
RETURN(-EAGAIN);
out_wake:
}
static int async_create_interpret(const struct lu_env *env,
- struct ptlrpc_request *req, void *data, int rc)
+ struct ptlrpc_request *req, void *data,
+ int rc)
{
struct osc_create_async_args *args = ptlrpc_req_async_args(req);
struct osc_creator *oscc = args->rq_oscc;
int ret;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
ret = handle_async_create(req, rc);
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
return ret;
}
struct obdo *oa = oinfo->oi_oa;
ENTRY;
- if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)){
+ if ((oa->o_valid & OBD_MD_FLGROUP) && !fid_seq_is_mdt(oa->o_seq)) {
rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
rc = oinfo->oi_cb_up(oinfo, rc);
RETURN(rc);
args->rq_lsm = *ea;
args->rq_oinfo = oinfo;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
/* try fast path */
rc = handle_async_create(fake_req, 0);
if (rc == -EAGAIN) {
/* we not have objects - try wait */
is_add = ptlrpcd_add_req(fake_req, PSCOPE_OTHER);
if (!is_add)
- list_add(&fake_req->rq_list,
- &oscc->oscc_wait_create_list);
+ cfs_list_add(&fake_req->rq_list,
+ &oscc->oscc_wait_create_list);
else
rc = is_add;
}
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
if (rc != -EAGAIN)
/* need free request if was error hit or
struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
struct lov_stripe_md *lsm;
- int rc = 0;
+ int del_orphan = 0, rc = 0;
ENTRY;
LASSERT(oa);
LASSERT(ea);
- LASSERT_MDS_GROUP(oa->o_gr);
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
RETURN(osc_real_create(exp, oa, ea, oti));
}
- if (oa->o_gr == FILTER_GROUP_LLOG || oa->o_gr == FILTER_GROUP_ECHO)
+ if (!fid_seq_is_mdt(oa->o_seq))
RETURN(osc_real_create(exp, oa, ea, oti));
/* this is the special case where create removes orphans */
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ if (oa->o_valid & OBD_MD_FLFLAGS &&
oa->o_flags == OBD_FL_DELORPHAN) {
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
RETURN(-EBUSY);
}
if (!(oscc->oscc_flags & OSCC_FLAG_RECOVERING)) {
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
RETURN(0);
}
/* seting flag LOW we prevent extra grow precreate size
* and enforce use last assigned size */
oscc->oscc_flags |= OSCC_FLAG_LOW;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
CDEBUG(D_HA, "%s: oscc recovery started - delete to "LPU64"\n",
oscc->oscc_obd->obd_name, oscc->oscc_next_id - 1);
+ del_orphan = 1;
+
/* delete from next_id on up */
oa->o_valid |= OBD_MD_FLID;
oa->o_id = oscc->oscc_next_id - 1;
rc = osc_real_create(exp, oa, ea, NULL);
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;
if (rc == 0 || rc == -ENOSPC) {
struct obd_connect_data *ocd;
oscc->oscc_last_id = oa->o_id;
ocd = &imp->imp_connect_data;
if (ocd->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
+ /*
+ * The OST reports back in oa->o_id from where
+ * we should restart in order to skip orphan
+ * objects
+ */
CDEBUG(D_HA, "%s: Skip orphan set, reset last "
"objid\n", oscc->oscc_obd->obd_name);
oscc->oscc_next_id = oa->o_id + 1;
}
cfs_waitq_signal(&oscc->oscc_waitq);
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
if (rc < 0)
RETURN(rc);
oscc->oscc_obd->obd_name);
rc = oscc_precreate(oscc);
- if (rc) {
+ if (rc)
CDEBUG(D_HA,"%s: error create %d\n",
oscc->oscc_obd->obd_name, rc);
+
+ cfs_spin_lock(&oscc->oscc_lock);
+
+ /* wakeup but recovery did not finished */
+ if ((oscc->oscc_obd->u.cli.cl_import->imp_invalid) ||
+ (oscc->oscc_flags & OSCC_FLAG_RECOVERING)) {
+ rc = -EIO;
+ cfs_spin_unlock(&oscc->oscc_lock);
break;
}
- spin_lock(&oscc->oscc_lock);
+ if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
+ rc = -ENOSPC;
+ cfs_spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+
+ if (oscc->oscc_flags & OSCC_FLAG_RDONLY) {
+ rc = -EROFS;
+ cfs_spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+
+ // Should we report -EIO error ?
if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
break;
}
- /* wakeup but recovery not finished */
- if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
- rc = -EIO;
- spin_unlock(&oscc->oscc_lock);
+
+ /**
+ * If this is DELORPHAN process, no need create object here,
+ * otherwise this will create a gap of object id, and MDS
+ * might create some orphan log (mds_lov_update_objids), then
+ * remove objects wrongly on OST. Bug 21379.
+ */
+ if (oa->o_valid & OBD_MD_FLFLAGS &&
+ oa->o_flags == OBD_FL_DELORPHAN) {
+ cfs_spin_unlock(&oscc->oscc_lock);
break;
}
lsm->lsm_object_id = oscc->oscc_next_id;
*ea = lsm;
oscc->oscc_next_id++;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n",
exp->exp_obd->obd_name, oscc->oscc_next_id);
break;
- } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
- rc = -ENOSPC;
- spin_unlock(&oscc->oscc_lock);
- break;
- } else if (oscc->oscc_flags & OSCC_FLAG_RDONLY) {
- rc = -EROFS;
- spin_unlock(&oscc->oscc_lock);
- break;
}
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
}
- if (rc == 0)
+ if (rc == 0) {
CDEBUG(D_INFO, "%s: returning objid "LPU64"\n",
obd2cli_tgt(oscc->oscc_obd), lsm->lsm_object_id);
- else if (*ea == NULL)
- obd_free_memmd(exp, &lsm);
+ } else {
+ if (*ea == NULL)
+ obd_free_memmd(exp, &lsm);
+ if (del_orphan != 0 && rc != -EIO)
+ /* Ignore non-IO precreate error for clear orphan */
+ rc = 0;
+ }
RETURN(rc);
}
memset(oscc, 0, sizeof(*oscc));
cfs_waitq_init(&oscc->oscc_waitq);
- spin_lock_init(&oscc->oscc_lock);
+ cfs_spin_lock_init(&oscc->oscc_lock);
oscc->oscc_obd = obd;
oscc->oscc_grow_count = OST_MIN_PRECREATE;
oscc->oscc_max_grow_count = OST_MAX_PRECREATE;
ENTRY;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
oscc->oscc_flags |= OSCC_FLAG_EXITING;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
}