Whamcloud - gitweb
- CROW (CReate On Write) (precreation is removed)
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index 8b3c13a..ae74d22 100644 (file)
@@ -656,7 +656,8 @@ out:
 #define log2(n) ffz(~(n))
 #endif
 
-static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
+static int lov_clear_orphans(struct obd_export *export,
+                             struct obdo *src_oa,
                              struct lov_stripe_md **ea,
                              struct obd_trans_info *oti)
 {
@@ -682,12 +683,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
         }
 
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                int err;
                 struct lov_stripe_md obj_md;
                 struct lov_stripe_md *obj_mdp = &obj_md;
-                int err;
 
-                /* if called for a specific target, we don't
-                   care if it is not active. */
+                /*
+                 * if called for a specific target, we don't care if it is not
+                 * active.
+                 */
                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
@@ -696,16 +699,25 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
                         continue;
 
+                /* 
+                 * setting up objid OSS objects should be destroyed starting
+                 * from it.
+                 */
                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+                tmp_oa->o_valid |= OBD_MD_FLID;
+                tmp_oa->o_id = oti->oti_objid[i];
 
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
                                  &obj_mdp, oti);
-                if (err)
-                        /* This export will be disabled until it is recovered,
-                           and then orphan recovery will be completed. */
+                if (err) {
+                        /*
+                         * this export will be disabled until it is recovered,
+                         * and then orphan recovery will be completed.
+                         */
                         CERROR("error in orphan recovery on OST idx %d/%d: "
                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
+                }
 
                 if (ost_uuid)
                         break;
@@ -714,51 +726,11 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
         RETURN(rc);
 }
 
-static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
-                        void *acl, int acl_size,
-                        struct lov_stripe_md **ea, struct obd_trans_info *oti)
-{
-        struct lov_stripe_md *obj_mdp, *lsm;
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        unsigned ost_idx;
-        int rc, i;
-        ENTRY;
-
-        LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
-                src_oa->o_flags & OBD_FL_RECREATE_OBJS);
-
-        OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
-        if (obj_mdp == NULL)
-                RETURN(-ENOMEM);
-
-        ost_idx = src_oa->o_nlink;
-        lsm = *ea;
-        if (lsm == NULL)
-                GOTO(out, rc = -EINVAL);
-        if (ost_idx >= lov->desc.ld_tgt_count)
-                GOTO(out, rc = -EINVAL);
-
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
-                        if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
-                                GOTO(out, rc = -EINVAL);
-                        break;
-                }
-        }
-        if (i == lsm->lsm_stripe_count)
-                GOTO(out, rc = -EINVAL);
-
-        rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, acl, acl_size,
-                        &obj_mdp, oti);
-out:
-        OBD_FREE(obj_mdp, sizeof(*obj_mdp));
-        RETURN(rc);
-}
-
 /* the LOV expects oa->o_id to be set to the LOV object id */
-static int lov_create(struct obd_export *exp, struct obdo *src_oa,
-                      void *acl, int acl_size,
-                      struct lov_stripe_md **ea, struct obd_trans_info *oti)
+static int
+lov_create(struct obd_export *exp, struct obdo *src_oa,
+           void *acl, int acl_size, struct lov_stripe_md **ea,
+           struct obd_trans_info *oti)
 {
         struct lov_request_set *set = NULL;
         struct list_head *pos;
@@ -780,13 +752,9 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
         if (!lov->desc.ld_active_tgt_count)
                 RETURN(-EIO);
 
-        /* Recreate a specific object id at the given OST index */
-        if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
-            (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                 rc = lov_recreate(exp, src_oa, acl, acl_size, ea, oti);
-                 RETURN(rc);
-        }
-
+        LASSERT(oti->oti_flags & OBD_MODE_CROW);
+                
+        /* main creation loop */
         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
         if (rc)
                 RETURN(rc);
@@ -795,9 +763,21 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
                 struct lov_request *req = 
                         list_entry(pos, struct lov_request, rq_link);
 
-                /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
-                                acl, acl_size, &req->rq_md, oti);
+                obd_id *objids = oti->oti_objid;
+
+                if (oti->oti_obj_alloc) {
+                        __u64 next_id;
+                                
+                        /* 
+                         * allocating new objid. Here it is delegated to caller,
+                         * that is MDS in CROW case.
+                         */
+                        next_id = oti->oti_obj_alloc(&objids[req->rq_idx]);
+                        req->rq_oa->o_id = next_id;
+                } else {
+                        /* and here is default "allocator" */
+                        req->rq_oa->o_id = ++objids[req->rq_idx];
+                }
                 lov_update_create_set(set, req, rc);
         }
         rc = lov_fini_create_set(set, ea);
@@ -996,12 +976,6 @@ static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
 
-        /* for now, we only expect time updates here */
-        LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
-                                      OBD_MD_FLATIME | OBD_MD_FLMTIME |
-                                      OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
-                                      OBD_MD_FLSIZE | OBD_MD_FLGROUP)));
-
         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
 
         lov = &exp->exp_obd->u.lov;
@@ -2081,21 +2055,6 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
 #define KEY_IS(str) \
         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
 
-        if (KEY_IS("next_id")) {
-                if (vallen != lov->desc.ld_tgt_count)
-                        RETURN(-EINVAL);
-                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                        /* initialize all OSCs, even inactive ones */
-                        if (obd_uuid_empty(&lov->tgts[i].uuid))
-                                continue;
-                        err = obd_set_info(lov->tgts[i].ltd_exp,
-                                          keylen, key, sizeof(obd_id),
-                                          ((obd_id*)val) + i);
-                        if (!rc)
-                                rc = err;
-                }
-                RETURN(rc);
-        }
         if (KEY_IS("async")) {
                 struct lov_desc *desc = &lov->desc;
                 struct lov_tgt_desc *tgts = lov->tgts;
@@ -2131,10 +2090,7 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(rc);
         }
 
-        if (KEY_IS("growth_count")) {
-                if (vallen != sizeof(int))
-                        RETURN(-EINVAL);
-        } else if (KEY_IS("mds_conn")) {
+        if (KEY_IS("mds_conn")) {
                 if (vallen != sizeof(__u32))
                         RETURN(-EINVAL);
         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {