#define log2(n) ffz(~(n))
#endif
-static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
+static int lov_clear_orphans(struct obd_export *export,
+ struct obdo *src_oa,
struct lov_stripe_md **ea,
struct obd_trans_info *oti)
{
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ int err;
struct lov_stripe_md obj_md;
struct lov_stripe_md *obj_mdp = &obj_md;
- int err;
- /* if called for a specific target, we don't
- care if it is not active. */
+ /*
+ * if called for a specific target, we don't care if it is not
+ * active.
+ */
if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
CDEBUG(D_HA, "lov idx %d inactive\n", i);
continue;
if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
continue;
+ /*
+ * setting up objid OSS objects should be destroyed starting
+ * from it.
+ */
memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+ tmp_oa->o_valid |= OBD_MD_FLID;
+ tmp_oa->o_id = oti->oti_objid[i];
/* XXX: LOV STACKING: use real "obj_mdp" sub-data */
err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
&obj_mdp, oti);
- if (err)
- /* This export will be disabled until it is recovered,
- and then orphan recovery will be completed. */
+ if (err) {
+ /*
+ * this export will be disabled until it is recovered,
+ * and then orphan recovery will be completed.
+ */
CERROR("error in orphan recovery on OST idx %d/%d: "
"rc = %d\n", i, lov->desc.ld_tgt_count, err);
+ }
if (ost_uuid)
break;
RETURN(rc);
}
-static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
- void *acl, int acl_size,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
-{
- struct lov_stripe_md *obj_mdp, *lsm;
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- unsigned ost_idx;
- int rc, i;
- ENTRY;
-
- LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
- src_oa->o_flags & OBD_FL_RECREATE_OBJS);
-
- OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
- if (obj_mdp == NULL)
- RETURN(-ENOMEM);
-
- ost_idx = src_oa->o_nlink;
- lsm = *ea;
- if (lsm == NULL)
- GOTO(out, rc = -EINVAL);
- if (ost_idx >= lov->desc.ld_tgt_count)
- GOTO(out, rc = -EINVAL);
-
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
- if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
- GOTO(out, rc = -EINVAL);
- break;
- }
- }
- if (i == lsm->lsm_stripe_count)
- GOTO(out, rc = -EINVAL);
-
- rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, acl, acl_size,
- &obj_mdp, oti);
-out:
- OBD_FREE(obj_mdp, sizeof(*obj_mdp));
- RETURN(rc);
-}
-
/* the LOV expects oa->o_id to be set to the LOV object id */
-static int lov_create(struct obd_export *exp, struct obdo *src_oa,
- void *acl, int acl_size,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
+static int
+lov_create(struct obd_export *exp, struct obdo *src_oa,
+ void *acl, int acl_size, struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
{
struct lov_request_set *set = NULL;
struct list_head *pos;
if (!lov->desc.ld_active_tgt_count)
RETURN(-EIO);
- /* Recreate a specific object id at the given OST index */
- if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
- (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
- rc = lov_recreate(exp, src_oa, acl, acl_size, ea, oti);
- RETURN(rc);
- }
-
+ LASSERT(oti->oti_flags & OBD_MODE_CROW);
+
+ /* main creation loop */
rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
if (rc)
RETURN(rc);
struct lov_request *req =
list_entry(pos, struct lov_request, rq_link);
- /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
- rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
- acl, acl_size, &req->rq_md, oti);
+ obd_id *objids = oti->oti_objid;
+
+ if (oti->oti_obj_alloc) {
+ __u64 next_id;
+
+ /*
+ * allocating new objid. Here it is delegated to caller,
+ * that is MDS in CROW case.
+ */
+ next_id = oti->oti_obj_alloc(&objids[req->rq_idx]);
+ req->rq_oa->o_id = next_id;
+ } else {
+ /* and here is default "allocator" */
+ req->rq_oa->o_id = ++objids[req->rq_idx];
+ }
lov_update_create_set(set, req, rc);
}
rc = lov_fini_create_set(set, ea);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
- /* for now, we only expect time updates here */
- LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
- OBD_MD_FLATIME | OBD_MD_FLMTIME |
- OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
- OBD_MD_FLSIZE | OBD_MD_FLGROUP)));
-
LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
lov = &exp->exp_obd->u.lov;
#define KEY_IS(str) \
(keylen == strlen(str) && memcmp(key, str, keylen) == 0)
- if (KEY_IS("next_id")) {
- if (vallen != lov->desc.ld_tgt_count)
- RETURN(-EINVAL);
- for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- /* initialize all OSCs, even inactive ones */
- if (obd_uuid_empty(&lov->tgts[i].uuid))
- continue;
- err = obd_set_info(lov->tgts[i].ltd_exp,
- keylen, key, sizeof(obd_id),
- ((obd_id*)val) + i);
- if (!rc)
- rc = err;
- }
- RETURN(rc);
- }
if (KEY_IS("async")) {
struct lov_desc *desc = &lov->desc;
struct lov_tgt_desc *tgts = lov->tgts;
RETURN(rc);
}
- if (KEY_IS("growth_count")) {
- if (vallen != sizeof(int))
- RETURN(-EINVAL);
- } else if (KEY_IS("mds_conn")) {
+ if (KEY_IS("mds_conn")) {
if (vallen != sizeof(__u32))
RETURN(-EINVAL);
} else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {