static void lov_finish_set(struct lov_request_set *set)
{
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
struct list_head *pos, *n;
ENTRY;
LASSERT(set);
list_for_each_safe(pos, n, &set->set_list) {
- struct lov_request *req = list_entry(pos, struct lov_request,
- rq_link);
+ struct lov_request *req;
+ struct lov_tgt_desc *tgt;
+
+ req = list_entry(pos, struct lov_request, rq_link);
+ LASSERT(req->rq_idx >= 0);
+
+ tgt = lov->tgts + req->rq_idx;
+ lov_tgt_decref(lov, tgt);
+
list_del_init(&req->rq_link);
if (req->rq_oa)
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !lov->tgts[req->rq_idx].active)
- rc = 0;
+ if (rc) {
+ struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
+
+ if (lov_tgt_active(lov, tgt, req->rq_gen))
+ lov_tgt_decref(lov, tgt);
+ else
+ rc = 0;
+ }
/* FIXME in raid1 regime, should return 0 */
RETURN(rc);
} else {
struct obd_export *exp = set->set_exp;
struct lov_obd *lov = &exp->exp_obd->u.lov;
+ struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- if (lov->tgts[req->rq_idx].active) {
+ if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
+ lov_tgt_decref(lov, tgt);
CERROR("error: enqueue objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
set->set_md->lsm_object_id, loi->loi_id,
loi->loi_ost_idx, rc);
} else {
+ CERROR("error: enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
+ set->set_md->lsm_object_id, loi->loi_id,
+ loi->loi_ost_idx, rc);
rc = ELDLM_OK;
}
}
struct lov_request *req;
struct lustre_handle *lov_lockhp = NULL;
struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ struct lov_tgt_desc *tgt;
int rc = 0;
ENTRY;
if (lov_lockhp->cookie == 0)
continue;
- rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
- mode, lov_lockhp);
- if (rc && lov->tgts[req->rq_idx].active)
+ tgt = lov->tgts + req->rq_idx;
+ rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
+ if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
+ lov_tgt_decref(lov, tgt);
CERROR("cancelling obdjid "LPX64" on OST "
"idx %d error: rc = %d\n",
req->rq_md->lsm_object_id, req->rq_idx, rc);
+ }
}
lov_llh_put(set->set_lockh);
RETURN(rc);
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt;
struct lov_request *req;
obd_off start, end;
policy->l_extent.end, &start, &end))
continue;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ tgt = lov->tgts + loi->loi_ost_idx;
+ if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
}
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_buflen = sizeof(*req->rq_md) +
sizeof(struct lov_oinfo);
OBD_ALLOC(req->rq_md, req->rq_buflen);
- if (req->rq_md == NULL)
+ if (req->rq_md == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_extent.start = start;
req->rq_extent.end = end;
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_stripe = i;
/* XXX LOV STACKING: submd should be from the subobj */
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt;
struct lov_request *req;
obd_off start, end;
continue;
/* FIXME raid1 should grace this error */
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ tgt = lov->tgts + loi->loi_ost_idx;
+ if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
GOTO(out_set, rc = -EIO);
}
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_buflen = sizeof(*req->rq_md);
OBD_ALLOC(req->rq_md, req->rq_buflen);
- if (req->rq_md == NULL)
+ if (req->rq_md == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_extent.start = start;
req->rq_extent.end = end;
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_stripe = i;
/* XXX LOV STACKING: submd should be from the subobj */
__u32 mode, struct lustre_handle *lockh,
struct lov_request_set **reqset)
{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
struct lov_request_set *set;
int i, rc = 0;
struct lov_oinfo *loi;
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt;
struct lov_request *req;
struct lustre_handle *lov_lockhp;
continue;
}
+ tgt = lov->tgts + loi->loi_ost_idx;
+ if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+ CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
+ loi->loi_ost_idx, loi->loi_id);
+ continue;
+ }
+
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_buflen = sizeof(*req->rq_md);
OBD_ALLOC(req->rq_md, req->rq_buflen);
- if (req->rq_md == NULL)
+ if (req->rq_md == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_stripe = i;
/* XXX LOV STACKING: submd should be from the subobj */
EXIT;
cleanup:
list_for_each (pos, &set->set_list) {
- struct obd_export *sub_exp;
+ struct lov_tgt_desc *tgt;
int err = 0;
req = list_entry(pos, struct lov_request, rq_link);
if (!req->rq_complete || req->rq_rc)
continue;
- sub_exp = lov->tgts[req->rq_idx].ltd_exp,
- err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
+ tgt = lov->tgts + req->rq_idx;
+ if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
+ CERROR("Failed to uncreate objid "LPX64" subobj "
+ LPX64" on OST idx %d: osc inactive.\n",
+ set->set_oa->o_id, req->rq_oa->o_id,
+ req->rq_idx);
+ continue;
+ }
+
+ err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
+ lov_tgt_decref(lov, tgt);
if (err)
CERROR("Failed to uncreate objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
struct obd_trans_info *oti = set->set_oti;
struct lov_stripe_md *lsm = set->set_md;
struct lov_oinfo *loi;
+ struct lov_tgt_desc *tgt;
ENTRY;
req->rq_stripe = set->set_success;
loi = &lsm->lsm_oinfo[req->rq_stripe];
+ tgt = lov->tgts + req->rq_idx;
- if (rc && lov->tgts[req->rq_idx].active) {
+ if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
+ lov_tgt_decref(lov, tgt);
CERROR("error creating objid "LPX64" sub-object"
" on OST idx %d/%d: rc = %d\n",
set->set_oa->o_id, req->rq_idx,
loi->loi_id = req->rq_oa->o_id;
loi->loi_gr = req->rq_oa->o_gr;
loi->loi_ost_idx = req->rq_idx;
- CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
- lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+ loi->loi_ost_gen = req->rq_gen;
+ CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
+ "idx %d gen %d\n",
+ lsm->lsm_object_id, loi->loi_id, loi->loi_id,
+ req->rq_idx, req->rq_gen);
loi_init(loi);
if (set->set_cookies)
shift = 0;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
struct lov_request *req;
+ struct lov_tgt_desc *tgt;
if (info[i].count == 0)
continue;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ tgt = lov->tgts + loi->loi_ost_idx;
+ if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
GOTO(out, rc = -EIO);
}
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out, rc = -ENOMEM);
+ }
req->rq_oa = obdo_alloc();
- if (req->rq_oa == NULL)
+ if (req->rq_oa == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out, rc = -ENOMEM);
+ }
if (src_oa)
memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
req->rq_oa->o_id = loi->loi_id;
req->rq_buflen = sizeof(*req->rq_md);
OBD_ALLOC(req->rq_md, req->rq_buflen);
- if (req->rq_md == NULL)
+ if (req->rq_md == NULL) {
+ obdo_free(req->rq_oa);
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out, rc = -ENOMEM);
+ }
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_stripe = i;
/* XXX LOV STACKING */
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
struct lov_request *req;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
}
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_stripe = i;
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_oa = obdo_alloc();
- if (req->rq_oa == NULL)
+ if (req->rq_oa == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
req->rq_oa->o_id = loi->loi_id;
lov_set_add_req(req, set);
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
struct lov_request *req;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
}
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
req->rq_stripe = i;
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_oa = obdo_alloc();
- if (req->rq_oa == NULL)
+ if (req->rq_oa == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
+
memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
req->rq_oa->o_id = loi->loi_id;
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
struct lov_request *req;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
}
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
+
req->rq_stripe = i;
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_oa = obdo_alloc();
- if (req->rq_oa == NULL)
+ if (req->rq_oa == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
+
memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
req->rq_oa->o_id = loi->loi_id;
LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
ENTRY;
lov_update_set(set, req, rc);
- if (rc && !lov->tgts[req->rq_idx].active)
- rc = 0;
+ if (rc) {
+ struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
+
+ if (lov_tgt_active(lov, tgt, req->rq_gen))
+ lov_tgt_decref(lov, tgt);
+ else
+ rc = 0;
+ }
+
/* FIXME in raid1 regime, should return 0 */
RETURN(rc);
}
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
struct lov_request *req;
obd_off rs, re;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
continue;
- }
- if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+ if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
+ }
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
+
req->rq_stripe = i;
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_oa = obdo_alloc();
- if (req->rq_oa == NULL)
+ if (req->rq_oa == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
+
memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
req->rq_oa->o_id = loi->loi_id;
req->rq_oa->o_gr = loi->loi_gr;
loi = lsm->lsm_oinfo;
for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
struct lov_request *req;
obd_off rs, re;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
continue;
- }
- if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+ if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
+ }
OBD_ALLOC(req, sizeof(*req));
- if (req == NULL)
+ if (req == NULL) {
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
+
req->rq_stripe = i;
req->rq_idx = loi->loi_ost_idx;
+ req->rq_gen = loi->loi_ost_gen;
req->rq_oa = obdo_alloc();
- if (req->rq_oa == NULL)
+ if (req->rq_oa == NULL) {
+ OBD_FREE(req, sizeof(*req));
+ lov_tgt_decref(lov, tgt);
GOTO(out_set, rc = -ENOMEM);
+ }
memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
req->rq_oa->o_id = loi->loi_id;
req->rq_extent.start = rs;