Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / lov / lov_request.c
index dcf5810..f99307c 100644 (file)
@@ -47,13 +47,21 @@ static void lov_init_set(struct lov_request_set *set)
 
 static void lov_finish_set(struct lov_request_set *set)
 {
+        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
         struct list_head *pos, *n;
         ENTRY;
 
         LASSERT(set);
         list_for_each_safe(pos, n, &set->set_list) {
-                struct lov_request *req = list_entry(pos, struct lov_request,
-                                                     rq_link);
+                struct lov_request *req;
+                struct lov_tgt_desc *tgt;
+
+                req = list_entry(pos, struct lov_request, rq_link);
+                LASSERT(req->rq_idx >= 0);
+
+                tgt = lov->tgts + req->rq_idx;
+                lov_tgt_decref(lov, tgt);
+
                 list_del_init(&req->rq_link);
 
                 if (req->rq_oa)
@@ -94,8 +102,14 @@ int lov_update_common_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !lov->tgts[req->rq_idx].active)
-                rc = 0;
+        if (rc) {
+                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
+
+                if (lov_tgt_active(lov, tgt, req->rq_gen))
+                        lov_tgt_decref(lov, tgt);
+                else
+                        rc = 0;
+        }
 
         /* FIXME in raid1 regime, should return 0 */
         RETURN(rc);
@@ -160,14 +174,20 @@ int lov_update_enqueue_set(struct lov_request_set *set,
         } else {
                 struct obd_export *exp = set->set_exp;
                 struct lov_obd *lov = &exp->exp_obd->u.lov;
+                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
 
                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
-                if (lov->tgts[req->rq_idx].active) {
+                if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                        lov_tgt_decref(lov, tgt);
                         CERROR("error: enqueue objid "LPX64" subobj "
                                 LPX64" on OST idx %d: rc = %d\n",
                                 set->set_md->lsm_object_id, loi->loi_id,
                                 loi->loi_ost_idx, rc);
                 } else {
+                        CERROR("error: enqueue objid "LPX64" subobj "
+                                LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
+                                set->set_md->lsm_object_id, loi->loi_id,
+                                loi->loi_ost_idx, rc);
                         rc = ELDLM_OK;
                 }
         }
@@ -181,6 +201,7 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
         struct lov_request *req;
         struct lustre_handle *lov_lockhp = NULL;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        struct lov_tgt_desc *tgt;
         int rc = 0;
         ENTRY;
 
@@ -206,12 +227,14 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
                 if (lov_lockhp->cookie == 0)
                         continue;
 
-                rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
-                                mode, lov_lockhp);
-                if (rc && lov->tgts[req->rq_idx].active)
+                tgt = lov->tgts + req->rq_idx;
+                rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
+                if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                        lov_tgt_decref(lov, tgt);
                         CERROR("cancelling obdjid "LPX64" on OST "
                                "idx %d error: rc = %d\n",
                                req->rq_md->lsm_object_id, req->rq_idx, rc);
+                }
         }
         lov_llh_put(set->set_lockh);
         RETURN(rc);
@@ -261,6 +284,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt;
                 struct lov_request *req;
                 obd_off start, end;
 
@@ -268,25 +292,32 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                                            policy->l_extent.end, &start, &end))
                         continue;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_buflen = sizeof(*req->rq_md) +
                         sizeof(struct lov_oinfo);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_extent.start = start;
                 req->rq_extent.end = end;
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
@@ -362,6 +393,7 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt;
                 struct lov_request *req;
                 obd_off start, end;
 
@@ -370,24 +402,31 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                         continue;
 
                 /* FIXME raid1 should grace this error */
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out_set, rc = -EIO);
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_buflen = sizeof(*req->rq_md);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_extent.start = start;
                 req->rq_extent.end = end;
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
@@ -427,6 +466,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                         __u32 mode, struct lustre_handle *lockh,
                         struct lov_request_set **reqset)
 {
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_request_set *set;
         int i, rc = 0;
         struct lov_oinfo *loi;
@@ -448,6 +488,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt;
                 struct lov_request *req;
                 struct lustre_handle *lov_lockhp;
 
@@ -458,16 +499,29 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                         continue;
                 }
 
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                        CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
+                               loi->loi_ost_idx, loi->loi_id);
+                        continue;
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_buflen = sizeof(*req->rq_md);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
@@ -536,15 +590,24 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         EXIT;
 cleanup:
         list_for_each (pos, &set->set_list) {
-                struct obd_export *sub_exp;
+                struct lov_tgt_desc *tgt;
                 int err = 0;
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
 
-                sub_exp = lov->tgts[req->rq_idx].ltd_exp,
-                err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
+                tgt = lov->tgts + req->rq_idx;
+                if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                        CERROR("Failed to uncreate objid "LPX64" subobj "
+                               LPX64" on OST idx %d: osc inactive.\n",
+                               set->set_oa->o_id, req->rq_oa->o_id,
+                               req->rq_idx);
+                        continue;
+                }
+
+                err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
+                lov_tgt_decref(lov, tgt);
                 if (err)
                         CERROR("Failed to uncreate objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
@@ -592,12 +655,15 @@ int lov_update_create_set(struct lov_request_set *set,
         struct obd_trans_info *oti = set->set_oti;
         struct lov_stripe_md *lsm = set->set_md;
         struct lov_oinfo *loi;
+        struct lov_tgt_desc *tgt;
         ENTRY;
 
         req->rq_stripe = set->set_success;
         loi = &lsm->lsm_oinfo[req->rq_stripe];
+        tgt = lov->tgts + req->rq_idx;
 
-        if (rc && lov->tgts[req->rq_idx].active) {
+        if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                lov_tgt_decref(lov, tgt);
                 CERROR("error creating objid "LPX64" sub-object"
                        " on OST idx %d/%d: rc = %d\n",
                        set->set_oa->o_id, req->rq_idx,
@@ -617,8 +683,11 @@ int lov_update_create_set(struct lov_request_set *set,
         loi->loi_id = req->rq_oa->o_id;
         loi->loi_gr = req->rq_oa->o_gr;
         loi->loi_ost_idx = req->rq_idx;
-        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
-               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+        loi->loi_ost_gen = req->rq_gen;
+        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
+               "idx %d gen %d\n",
+               lsm->lsm_object_id, loi->loi_id, loi->loi_id,
+               req->rq_idx, req->rq_gen);
         loi_init(loi);
 
         if (set->set_cookies)
@@ -822,32 +891,44 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
         shift = 0;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
                 struct lov_request *req;
+                struct lov_tgt_desc *tgt;
 
                 if (info[i].count == 0)
                         continue;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out, rc = -EIO);
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
 
                 if (src_oa)
                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 req->rq_buflen = sizeof(*req->rq_md);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        obdo_free(req->rq_oa);
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING */
@@ -930,23 +1011,30 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 lov_set_add_req(req, set);
@@ -1002,23 +1090,31 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
 
@@ -1082,22 +1178,31 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
@@ -1128,8 +1233,15 @@ int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
         ENTRY;
 
         lov_update_set(set, req, rc);
-        if (rc && !lov->tgts[req->rq_idx].active)
-                rc = 0;
+        if (rc) {
+                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
+
+                if (lov_tgt_active(lov, tgt, req->rq_gen))
+                        lov_tgt_decref(lov, tgt);
+                else
+                        rc = 0;
+        }
+
         /* FIXME in raid1 regime, should return 0 */
         RETURN(rc);
 }
@@ -1176,26 +1288,35 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
                         continue;
-                }
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
+                }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 req->rq_oa->o_gr = loi->loi_gr;
@@ -1256,26 +1377,34 @@ int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
                         continue;
-                }
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
+                }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 req->rq_extent.start = rs;