Whamcloud - gitweb
Branch: HEAD
authorwangdi <wangdi>
Wed, 17 Aug 2005 09:32:06 +0000 (09:32 +0000)
committerwangdi <wangdi>
Wed, 17 Aug 2005 09:32:06 +0000 (09:32 +0000)
land ost add/del to HEAD

21 files changed:
lustre/cmobd/cm_oss_reint.c
lustre/include/linux/lustre_idl.h
lustre/include/linux/obd.h
lustre/ldlm/ldlm_lib.c
lustre/liblustre/rw.c
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_internal.h
lustre/lov/lov_log.c
lustre/lov/lov_obd.c
lustre/lov/lov_pack.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/lov/lproc_lov.c
lustre/obdecho/echo_client.c
lustre/osc/osc_request.c
lustre/tests/cfg/lmv.sh
lustre/utils/lconf
lustre/utils/lmc
lustre/utils/lustre_cfg.c

index 8f0dade..348e182 100644 (file)
@@ -63,6 +63,7 @@ int cmobd_dummy_lsm(struct lov_stripe_md **lsmp, int stripe_cnt,
                         (*lsmp)->lsm_object_gr = oa->o_gr;
                 }
                 (*lsmp)->lsm_oinfo[i].loi_ost_idx = i;
+                (*lsmp)->lsm_oinfo[i].loi_ost_gen = 1;
                 (*lsmp)->lsm_stripe_size = stripe_size;
         }
         RETURN(0);
index c7f10ad..81835c7 100644 (file)
@@ -353,7 +353,7 @@ extern void lustre_swab_obdo (struct obdo *o);
 #define LOV_PATTERN_CMOBD 0x200  
  
 #define lov_ost_data lov_ost_data_v1
-struct lov_ost_data_v1 {          /* per-stripe data structure (little-endian)*/
+struct lov_ost_data {          /* per-stripe data structure (little-endian)*/
         __u64 l_object_id;        /* OST object ID */
         __u64 l_object_gr;        /* OST object group (creating MDS number) */
         __u32 l_ost_gen;          /* generation of this l_ost_idx */
index af1f903..2d7fb4d 100644 (file)
@@ -557,11 +557,17 @@ struct conf_obd {
         struct lvfs_obd_ctxt    *cfobd_lvfs_ctxt;
 };
 
+enum lov_tgt_flags {
+        LTD_ACTIVE      = 0x1, /* is this target up for requests   */
+        LTD_DEL_PENDING = 0x2, /* delete event pending for this tgt */
+};
+
 struct lov_tgt_desc {
         struct obd_uuid         uuid;
         __u32                   ltd_gen;
         struct obd_export      *ltd_exp;
-        int                     active;   /* is this target up for requests */
+        unsigned int            ltd_flags;
+        int                     ltd_refcount;
 };
 
 struct lov_obd {
@@ -572,6 +578,7 @@ struct lov_obd {
         int                     lo_catalog_loaded:1, async:1;
         struct semaphore        lov_llog_sem;
         unsigned long           lov_connect_flags;
+        wait_queue_head_t       lov_tgt_waitq;
         struct lov_tgt_desc    *tgts;
 };
 
index 0927476..75ac006 100644 (file)
@@ -1412,7 +1412,7 @@ static int target_recovery_thread(void *arg)
                 atomic_sub(stale, &obd->obd_lock_replay_clients);
                 abort_req_replay_queue(obd);
                 /* XXX for debuggin tests 11 and 17 */
-                LBUG();
+                //LBUG();
         }
 
         /* The second stage: replay locks */
index 543be8a..e0ffe6c 100644 (file)
@@ -99,7 +99,7 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock
         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
         if (rc != 0) {
                 CERROR("obd_get_info: rc = %d\n", rc);
-                LBUG();
+                RETURN(rc);
         }
         LASSERT(stripe < lsm->lsm_stripe_count);
         RETURN(stripe);
@@ -185,8 +185,13 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
 
         /* First, find out which stripe index this lock corresponds to. */
-        if (lli->lli_smd->lsm_stripe_count > 1)
+        if (lli->lli_smd->lsm_stripe_count > 1) {
                 stripe = llu_lock_to_stripe_offset(inode, lock);
+                if (stripe < 0) {
+                        CWARN("lock on inode without such object\n");
+                        GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
+                }
+        }
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc) {
index cdf877b..02cc252 100644 (file)
@@ -850,8 +850,11 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 lsm = lli->lli_smd;
 
                 stripe = ll_lock_to_stripe_offset(inode, lock);
-                if (stripe < 0)
+                if (stripe < 0) {
+                        CERROR("ll_lock_to_stripe_offset failed: %d\n", stripe);
                         goto iput;
+                }
+
                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
 
                 down(&lli->lli_size_sem);
@@ -901,8 +904,10 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
 
         stripe = ll_lock_to_stripe_offset(inode, lock);
-        if (stripe < 0)
+        if (stripe < 0) {
+                CERROR("ll_lock_to_stripe_offset failed: %d\n", stripe);
                 goto iput;
+        }
 
         if (lock->l_lvb_len) {
                 struct lov_stripe_md *lsm = lli->lli_smd;
@@ -954,8 +959,10 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
 
         /* First, find out which stripe index this lock corresponds to. */
         stripe = ll_lock_to_stripe_offset(inode, lock);
-        if (stripe < 0)
+        if (stripe < 0) {
+                CERROR("ll_lock_to_stripe_offset failed: %d\n", stripe);
                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
+        }
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc) {
index 6cb1455..de8f344 100644 (file)
@@ -1049,16 +1049,6 @@ int ll_process_config_update(struct ll_sb_info *sbi, int clean)
                 RETURN(0);
         }
 
-        rc = obd_cancel_unused(sbi->ll_md_exp, NULL,
-                               LDLM_FL_CONFIG_CHANGE, NULL);
-        if (rc != 0)
-                CWARN("obd_cancel_unused(mdc): %d\n", rc);
-
-        rc = obd_cancel_unused(sbi->ll_dt_exp, NULL,
-                               LDLM_FL_CONFIG_CHANGE, NULL);
-        if (rc != 0)
-                CWARN("obd_cancel_unused(lov): %d\n", rc);
-
         cfg.cfg_instance = sbi->ll_instance;
         cfg.cfg_uuid = sbi->ll_sb_uuid;
         cfg.cfg_local_nid = lmd->lmd_local_nid;
index d7c05c6..85c6501 100644 (file)
@@ -600,6 +600,7 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
                 CERROR("Can't setup LMV object manager, "
                        "error %d.\n", rc);
                 OBD_FREE(lmv->tgts, lmv->tgts_size);
+                RETURN(rc);
         }
 
         tgt_obd = class_find_client_obd(&lmv->tgts->uuid, OBD_MDC_DEVICENAME,
@@ -609,6 +610,11 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
                 RETURN(-EINVAL);
         }
 
+        rc = obd_llog_init(obd, &obd->obd_llogs, tgt_obd, 0, NULL);
+        if (rc) {
+                CERROR("lmv_setup failed to setup llogging subsystems\n");
+        }
+
         RETURN(rc);
 }
 
index b6172f5..75258d7 100644 (file)
@@ -23,6 +23,7 @@ struct lov_request {
         struct list_head         rq_link;
         struct ldlm_extent       rq_extent;
         int                      rq_idx;        /* index in lov->tgts array */
+        int                      rq_gen;        /* lov target generation # */
         int                      rq_stripe;     /* stripe number */
         int                      rq_complete;
         int                      rq_rc;
@@ -54,9 +55,91 @@ struct lov_request_set {
 
 #define LOV_MAX_TGT_COUNT 1024
 
-static inline int lov_tgt_changed(struct lov_obd *lov, struct lov_oinfo *loi)
+#define lov_tgts_lock(lov)      spin_lock(&lov->lov_lock);
+#define lov_tgts_unlock(lov)    spin_unlock(&lov->lov_lock);
+
+static inline void
+lov_tgt_set_flags(struct lov_obd *lov, struct lov_tgt_desc *tgt, int flags)
+{
+        lov_tgts_lock(lov);
+        if ((flags & LTD_ACTIVE) && ((tgt->ltd_flags & LTD_ACTIVE) == 0))
+                lov->desc.ld_active_tgt_count++;
+        tgt->ltd_flags |= flags;
+        lov_tgts_unlock(lov);
+}
+
+static inline void
+lov_tgt_clear_flags(struct lov_obd *lov, struct lov_tgt_desc *tgt, int flags)
+{
+        ENTRY;
+
+        lov_tgts_lock(lov);
+        if ((flags & LTD_ACTIVE) && (tgt->ltd_flags & LTD_ACTIVE))
+                lov->desc.ld_active_tgt_count--;
+        tgt->ltd_flags &= ~flags;
+        lov_tgts_unlock(lov);
+        EXIT;
+}
+
+static inline int
+lov_tgt_changed(struct lov_obd *lov, struct lov_oinfo *loi)
 {
-        return lov->tgts[loi->loi_ost_idx].ltd_gen != loi->loi_ost_gen;
+        int changed;
+
+        lov_tgts_lock(lov);
+        changed = lov->tgts[loi->loi_ost_idx].ltd_gen != loi->loi_ost_gen;
+        lov_tgts_unlock(lov);
+
+        return changed;
+}
+
+static inline int
+lov_tgt_active(struct lov_obd *lov, struct lov_tgt_desc *tgt, int gen)
+{
+        int rc = 0;
+        lov_tgts_lock(lov);
+
+        if (((gen == 0) || (gen == tgt->ltd_gen)) &&
+            ((tgt->ltd_flags &(LTD_ACTIVE|LTD_DEL_PENDING)) == LTD_ACTIVE)) {
+                tgt->ltd_refcount++;
+                rc = 1;
+        }
+
+        lov_tgts_unlock(lov);
+        return rc;
+}
+
+static inline int
+lov_tgt_ready(struct lov_obd *lov, struct lov_tgt_desc *tgt, int gen)
+{
+        int rc = 0;
+
+        lov_tgts_lock(lov);
+
+        if (((gen == 0) || (gen == tgt->ltd_gen)) &&
+            (tgt->ltd_flags & LTD_ACTIVE)) {
+                tgt->ltd_refcount++;
+                rc = 1;
+        }
+
+        lov_tgts_unlock(lov);
+        return rc;
+}
+
+static inline void
+lov_tgt_decref(struct lov_obd *lov, struct lov_tgt_desc *tgt)
+{
+        int do_wakeup = 0;
+
+        lov_tgts_lock(lov);
+
+        if ((--tgt->ltd_refcount == 0) && (tgt->ltd_flags & LTD_DEL_PENDING)) {
+                do_wakeup = 1;
+        } 
+
+        lov_tgts_unlock(lov);
+        if (do_wakeup)
+                wake_up(&lov->lov_tgt_waitq);
 }
 
 struct lov_async_page {
index f1eecb0..ab6ccd7 100644 (file)
@@ -84,9 +84,18 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
          * file 1 on ost_idx [1, 2, 3, 4] and file 2 on ost_idx [3, 4, 1, 2] */
         down(&lov->lov_llog_sem);
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
-                struct obd_device *child =
-                        lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd;
+                struct obd_device *child;
                 struct llog_ctxt *cctxt;
+                struct lov_tgt_desc *tgt;
+
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                        CWARN("lov_llog_origin_add: ost idx %d inactive.\n",
+                              loi->loi_ost_idx);
+                        continue;
+                }
+
+                child = tgt->ltd_exp->exp_obd;
                 cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
 
                 lur->lur_oid = loi->loi_id;
@@ -95,6 +104,7 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
                 rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc,
                                numcookies - rc, NULL,
                                lock != NULL ? lock + rc : NULL, lock_count);
+                lov_tgt_decref(lov, tgt);
         }
         up(&lov->lov_llog_sem);
         OBD_FREE(lur, sizeof(*lur));
@@ -117,15 +127,19 @@ static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
                 struct obd_device *child;
                 struct llog_ctxt *cctxt;
 
-                if (!tgt->active)
+                if (!lov_tgt_active(lov, tgt, 0))
                         continue;
-                child = tgt->ltd_exp->exp_obd;
 
+                child = tgt->ltd_exp->exp_obd;
                 cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
-                if (uuid && !obd_uuid_equals(uuid, &lov->tgts[i].uuid))
+
+                if (uuid && !obd_uuid_equals(uuid, &tgt->uuid)) {
+                        lov_tgt_decref(lov, tgt);
                         continue;
+                }
 
                 rc = llog_connect(cctxt, 1, logid, gen, uuid);
+                lov_tgt_decref(lov, tgt);
                 if (rc) {
                         CERROR("error osc_llog_connect %d\n", i);
                         break;
@@ -153,14 +167,24 @@ static int lov_llog_repl_cancel(struct llog_ctxt *ctxt, int count,
         loi = lsm->lsm_oinfo;
         lov = &obd->u.lov;
         for (i = 0; i < count; i++, cookies++, loi++) {
-                struct obd_device *child =
-                        lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; 
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
+                struct obd_device *child;
                 struct llog_ctxt *cctxt;
                 int err;
 
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                        CWARN("warning: LOV OST idx %d: inactive.\n",
+                              loi->loi_ost_idx);
+                        continue;
+                }
+
+                child = tgt->ltd_exp->exp_obd; 
                 cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
                 err = llog_cancel(cctxt, 1, cookies, flags, NULL);
-                if (err && lov->tgts[loi->loi_ost_idx].active) {
+                lov_tgt_decref(lov, tgt);
+
+                if (err && lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                        lov_tgt_decref(lov, tgt);
                         CERROR("error: objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
                                loi->loi_id, loi->loi_ost_idx, err);
@@ -202,10 +226,12 @@ int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         for (i = 0, ctgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, ctgt++) {
                 struct obd_device *child;
 
-                if (!ctgt->active)
+                if (!lov_tgt_active(lov, ctgt, 0))
                         continue;
+
                 child = ctgt->ltd_exp->exp_obd;
                 rc = obd_llog_init(child, &child->obd_llogs, tgt, 1, logid + i);
+                lov_tgt_decref(lov, ctgt);
                 if (rc) {
                         CERROR("error osc_llog_init %d\n", i);
                         break;
@@ -237,10 +263,12 @@ int lov_llog_finish(struct obd_device *obd, struct obd_llogs *llogs, int count)
         for (i = 0, tgt = lov->tgts; i < count; i++, tgt++) {
                 struct obd_device *child;
 
-                if (!tgt->active)
+                if (!lov_tgt_active(lov, tgt, 0))
                         continue;
+
                 child = tgt->ltd_exp->exp_obd;
                 rc = obd_llog_finish(child, &child->obd_llogs, 1);
+                lov_tgt_decref(lov, tgt);
                 if (rc) {
                         CERROR("osc_llog_finish error; index=%d; rc=%d\n",
                                i, rc);
index b09283e..31c1be1 100644 (file)
@@ -116,8 +116,7 @@ static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
                 RETURN(rc);
         }
 
-        tgt->active = 1;
-        lov->desc.ld_active_tgt_count++;
+        lov_tgt_set_flags(lov, tgt, LTD_ACTIVE);
 
 #ifdef __KERNEL__
         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
@@ -208,8 +207,8 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         while (i-- > 0) {
                 struct obd_uuid uuid;
                 --tgt;
-                --lov->desc.ld_active_tgt_count;
-                tgt->active = 0;
+                lov_tgt_clear_flags(lov, tgt, LTD_ACTIVE);
+
                 /* save for CERROR below; (we know it's terminated) */
                 uuid = tgt->uuid;
                 rc2 = obd_disconnect(tgt->ltd_exp, 0);
@@ -257,20 +256,23 @@ static int lov_disconnect_obd(struct obd_device *obd,
                         osc_obd->obd_no_recov = 1;
         }
 
+        /* XXX - shouldn't this be after the obd_disconnect() ? */
         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
+
         rc = obd_disconnect(tgt->ltd_exp, flags);
         if (rc) {
-                if (tgt->active) {
+                if (lov_tgt_active(lov, tgt, 0)) {
+                        lov_tgt_decref(lov, tgt);
                         CERROR("Target %s disconnect error %d\n",
                                tgt->uuid.uuid, rc);
                 }
-                rc = 0;
         }
 
-        if (tgt->active) {
-                tgt->active = 0;
-                lov->desc.ld_active_tgt_count--;
-        }
+        CDEBUG(D_CONFIG, "idx: %d flags: 0x%x active: %d\n",
+               tgt - lov->tgts, tgt->ltd_flags,
+               lov->desc.ld_active_tgt_count);
+
+        lov_tgt_clear_flags(lov, tgt, LTD_ACTIVE);
         tgt->ltd_exp = NULL;
         RETURN(0);
 }
@@ -330,7 +332,7 @@ static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
                lov, uuid->uuid, activate);
 
-        spin_lock(&lov->lov_lock);
+        lov_tgts_lock(lov);
         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
                 if (tgt->ltd_exp == NULL)
                         continue;
@@ -345,27 +347,29 @@ static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
         if (i == lov->desc.ld_tgt_count)
                 GOTO(out, rc = -EINVAL);
 
-
-        if (tgt->active == activate) {
-                CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,                       
-                        activate ? "" : "in");
-                GOTO(out, rc);
-        }
-
-        CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
-               activate ? "" : "in");
-        CDEBUG(D_ERROR, "Marking OSC %s %sactive\n", uuid->uuid,
-               activate ? "" : "in");
-
-        tgt->active = activate;
-        if (activate)
+        if (activate) {
+                if (tgt->ltd_flags & LTD_ACTIVE) {
+                        CDEBUG(D_CONFIG|D_INFO, "OSC %s already active!\n",
+                               uuid->uuid);
+                        GOTO(out, rc);
+                }
+                CDEBUG(D_INFO, "Marking OSC %s active\n", uuid->uuid);
+                tgt->ltd_flags |= LTD_ACTIVE;
                 lov->desc.ld_active_tgt_count++;
-        else
+        } else {
+                if ((tgt->ltd_flags & LTD_ACTIVE) == 0) {
+                        CDEBUG(D_CONFIG|D_INFO, "OSC %s already inactive!\n",
+                               uuid->uuid);
+                        GOTO(out, rc);
+                }
+                CDEBUG(D_INFO, "Marking OSC %s inactive\n", uuid->uuid);
+                tgt->ltd_flags &= ~LTD_ACTIVE;
                 lov->desc.ld_active_tgt_count--;
-
+        }
         EXIT;
  out:
-        spin_unlock(&lov->lov_lock);
+        lov_tgts_unlock(lov);
         return rc;
 }
 
@@ -382,6 +386,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                        watched->obd_name);
                 return -EINVAL;
         }
+
         uuid = &watched->u.cli.cl_import->imp_target_uuid;
 
         /* Set OSC as active before notifying the observer, so the
@@ -479,6 +484,7 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         lov->desc = *desc;
         spin_lock_init(&lov->lov_lock);
         sema_init(&lov->lov_llog_sem, 1);
+        init_waitqueue_head(&lov->lov_tgt_waitq);
 
         RETURN(0);
 }
@@ -515,7 +521,11 @@ lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
         }
 
         tgt = lov->tgts + index;
+
+        lov_tgts_lock(lov);
+
         if (!obd_uuid_empty(&tgt->uuid)) {
+                lov_tgts_unlock(lov);
                 CERROR("OBD already assigned at LOV target index %d\n",
                        index);
                 RETURN(-EEXIST);
@@ -524,16 +534,22 @@ lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
         tgt->uuid = *uuidp;
         /* XXX - add a sanity check on the generation number. */
         tgt->ltd_gen = gen;
+        tgt->ltd_flags = 0;
 
         if (index >= lov->desc.ld_tgt_count)
                 lov->desc.ld_tgt_count = index + 1;
 
+        lov_tgts_unlock(lov);
+
         CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
 
         if (lov->refcount == 0)
                 RETURN(0);
 
+        /* We don't need to lock the tgt entry here because the
+         * config events are single threaded. Therefore only the
+         * current thread can be changing the tgts tbl. */
         if (tgt->ltd_exp) {
                 struct obd_device *osc_obd;
 
@@ -542,7 +558,7 @@ lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
                         osc_obd->obd_no_recov = 0;
         }
 
-        rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
+        rc = lov_connect_obd(obd, tgt, 0, NULL, lov->lov_connect_flags);
         if (rc)
                 GOTO(out, rc);
 
@@ -564,6 +580,7 @@ lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
 {
         struct lov_obd *lov = &obd->u.lov;
         struct lov_tgt_desc *tgt;
+        struct l_wait_info lwi = { 0 };
         int count = lov->desc.ld_tgt_count;
         int rc = 0;
         ENTRY;
@@ -590,9 +607,16 @@ lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
                 RETURN(-EINVAL);
         }
 
+        lov_tgt_set_flags(lov, tgt, LTD_DEL_PENDING);
+
         if (tgt->ltd_exp) {
                 struct obd_device *osc_obd;
 
+                rc = obd_cancel_unused(tgt->ltd_exp, NULL,
+                                       LDLM_FL_CONFIG_CHANGE, NULL);
+                if (rc != 0)
+                        CWARN("obd_cancel_unused(osc): %d\n", rc);
+
                 osc_obd = class_exp2obd(tgt->ltd_exp);
                 if (osc_obd) {
                         osc_obd->obd_no_recov = 1;
@@ -600,19 +624,36 @@ lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
                         if (rc)
                                 CERROR("osc_llog_finish error: %d\n", rc);
                 }
+
                 lov_disconnect_obd(obd, tgt, 0);
         }
 
+        CDEBUG(D_CONFIG, "Sleeping until LOV target index %d UUID %s "
+               "is quiesced; ltd_refcount: %d ld_active_tgt_count: %d.\n",
+               index, uuidp->uuid, tgt->ltd_refcount,
+               lov->desc.ld_active_tgt_count);
+
+        /* XXX - should we set a timeout ? */
+        lwi = LWI_TIMEOUT_INTR(0, NULL, NULL, NULL);
+        rc = l_wait_event(lov->lov_tgt_waitq, (tgt->ltd_refcount == 0), &lwi);
+        if (rc) {
+                lov_tgt_clear_flags(lov, tgt, LTD_DEL_PENDING);
+                CERROR("LOV target delete UUID %s index %d aborted: %d.\n",
+                       uuidp->uuid, index, rc);
+                RETURN(rc);
+        }
+
+        CDEBUG(D_CONFIG, "LOV target index %d UUID %s is quiesced; "
+               "ltd_refcount: %d ld_active_tgt_count: %d.\n",
+               index, uuidp->uuid, tgt->ltd_refcount,
+               lov->desc.ld_active_tgt_count);
+
         /* XXX - right now there is a dependency on ld_tgt_count being the
          * maximum tgt index for computing the mds_max_easize. So we can't
          * shrink it. */
 
         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
         memset(tgt, 0, sizeof(*tgt));
-
-        CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
-               tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
-
         RETURN(rc);
 }
 
@@ -666,6 +707,7 @@ static int lov_clear_orphans(struct obd_export *export,
         struct lov_obd *lov;
         struct obdo *tmp_oa;
         struct obd_uuid *ost_uuid = NULL;
+        struct lov_tgt_desc *tgt;
         int rc = 0, i;
         ENTRY;
 
@@ -684,22 +726,26 @@ static int lov_clear_orphans(struct obd_export *export,
                        ost_uuid->uuid);
         }
 
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                int err;
+        for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
                 struct lov_stripe_md obj_md;
                 struct lov_stripe_md *obj_mdp = &obj_md;
+                int active = 0;
+                int err;
 
-                /*
-                 * if called for a specific target, we don't care if it is not
-                 * active.
-                 */
-                if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
+                /* if called for a specific target, we don't
+                   care if it is not active. */
+                if (lov_tgt_active(lov, tgt, 0)) {
+                        active = 1;
+                } else if (ost_uuid == NULL) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
                 }
 
-                if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
+                if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->uuid)) {
+                        if (active)
+                                lov_tgt_decref(lov, tgt);
                         continue;
+                }
 
                 /* 
                  * setting up objid OSS objects should be destroyed starting
@@ -710,20 +756,20 @@ static int lov_clear_orphans(struct obd_export *export,
                 tmp_oa->o_id = oti->oti_objid[i];
 
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
-                                 &obj_mdp, oti);
-                if (err) {
-                        /*
-                         * this export will be disabled until it is recovered,
-                         * and then orphan recovery will be completed.
-                         */
+                err = obd_create(tgt->ltd_exp, tmp_oa, NULL, 0, &obj_mdp, oti);
+                if (err)
+                        /* This export will be disabled until it is recovered,
+                           and then orphan recovery will be completed. */
                         CERROR("error in orphan recovery on OST idx %d/%d: "
                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
-                }
+
+                if (active)
+                        lov_tgt_decref(lov, tgt);
 
                 if (ost_uuid)
                         break;
         }
+
         obdo_free(tmp_oa);
         RETURN(rc);
 }
@@ -750,10 +796,6 @@ lov_create(struct obd_export *exp, struct obdo *src_oa,
                 RETURN(rc);
         }
 
-        lov = &exp->exp_obd->u.lov;
-        if (!lov->desc.ld_active_tgt_count)
-                RETURN(-EIO);
-
         LASSERT(oti->oti_flags & OBD_MODE_CROW);
                 
         /* main creation loop */
@@ -761,6 +803,8 @@ lov_create(struct obd_export *exp, struct obdo *src_oa,
         if (rc)
                 RETURN(rc);
 
+        lov = &exp->exp_obd->u.lov;
+
         list_for_each (pos, &set->set_list) {
                 struct lov_request *req = 
                         list_entry(pos, struct lov_request, rq_link);
@@ -1006,6 +1050,7 @@ static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
         static int next_idx = 0;
         struct lov_tgt_desc *tgt;
         int i, count;
+        ENTRY;
 
         /* XXX - we should do something clever and take lsm
          * into account but just do round robin for now. */
@@ -1015,14 +1060,14 @@ static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
         count = lov->desc.ld_tgt_count;
 
         for (i = next_idx, tgt = lov->tgts + i; i < count; i++, tgt++) {
-                if (tgt->active) {
+                if (lov_tgt_active(lov, tgt, 0)) {
                         next_idx = (i + 1) % count;
                         RETURN(i);
                 }
         }
 
         for (i = 0, tgt = lov->tgts; i < next_idx; i++, tgt++) {
-                if (tgt->active) {
+                if (lov_tgt_active(lov, tgt, 0)) {
                         next_idx = (i + 1) % count;
                         RETURN(i);
                 }
@@ -1051,10 +1096,13 @@ static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt;
                 int rc;
-                if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
+
+                if (!lov_tgt_changed(lov, loi))
                         continue;
 
+                /* returns an active tgt. tgt->ltd_refcount is incremented. */
                 ost_idx = lov_revalidate_policy(lov, lsm);
                 if (ost_idx < 0) {
                         /* FIXME: punt for now. */
@@ -1066,22 +1114,30 @@ static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
                 /* create a new object */
                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                osc_exp = lov->tgts[ost_idx].ltd_exp;
+                tgt = lov->tgts + ost_idx;
+                osc_exp = tgt->ltd_exp;
                 rc = obd_create(osc_exp, tmp_oa, NULL, 0, &obj_mdp, oti);
                 if (rc) {
+                        lov_tgt_decref(lov, tgt);
                         CERROR("error creating new subobj at idx %d; "
                                "rc = %d\n", ost_idx, rc);
                         continue;
                 }
+
+                CDEBUG(D_INODE,
+                       "replacing idx %d gen %d objid "LPX64" subobj "LPX64" "
+                       "with idx %d gen %d objid "LPX64" subobj "LPX64".\n",
+                       loi->loi_ost_idx, loi->loi_ost_gen,
+                       loi->loi_id, loi->loi_gr,
+                       ost_idx, tgt->ltd_gen, tmp_oa->o_id, tmp_oa->o_gr);
+
                 if (oti->oti_objid)
                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
                 loi->loi_id = tmp_oa->o_id;
                 loi->loi_gr = tmp_oa->o_gr;
                 loi->loi_ost_idx = ost_idx;
-                loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
-                CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
-                       " with idx %d gen %d.\n", lsm->lsm_object_id,
-                       loi->loi_id, ost_idx, loi->loi_ost_gen);
+                loi->loi_ost_gen = tgt->ltd_gen;
+                lov_tgt_decref(lov, tgt);
                 updates = 1;
         }
 
@@ -1191,8 +1247,9 @@ static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
         /* The caller just wants to know if there's a chance that this
          * I/O can succeed */
         for (i = 0; i < oa_bufs; i++) {
-                int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
-                int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
+                struct lov_oinfo *loi;
+                struct lov_tgt_desc *tgt;
+                int stripe;
                 obd_off start, end;
 
                 if (!lov_stripe_intersects(lsm, i, pga[i].disk_offset,
@@ -1200,12 +1257,17 @@ static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
                                            &start, &end))
                         continue;
 
-                if (lov->tgts[ost].active == 0) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", ost);
+                stripe = lov_stripe_number(lsm, pga[i].disk_offset);
+                loi = lsm->lsm_oinfo + stripe;
+                tgt = lov->tgts + loi->loi_ost_idx;
+
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         RETURN(-EIO);
                 }
-                rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
+                rc = obd_brw(OBD_BRW_CHECK, tgt->ltd_exp, oa,
                              NULL, 1, &pga[i], NULL);
+                lov_tgt_decref(lov, tgt);
                 if (rc)
                         break;
         }
@@ -1365,6 +1427,7 @@ static int lov_prep_async_page(struct obd_export *exp,
                                void *data, void **res)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
+        struct lov_tgt_desc *tgt;
         struct lov_async_page *lap;
         int rc, stripe;
         ENTRY;
@@ -1376,16 +1439,6 @@ static int lov_prep_async_page(struct obd_export *exp,
         stripe = lov_stripe_number(lsm, offset);
         loi = &lsm->lsm_oinfo[stripe];
 
-        if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
-                RETURN(-EIO);
-        if (lov->tgts[loi->loi_ost_idx].active == 0)
-                RETURN(-EIO);
-        if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
-                CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
-                       "deleted or inactive.\n", loi->loi_ost_idx);
-                RETURN(-EIO);
-        }
-
         OBD_ALLOC(lap, sizeof(*lap));
         if (lap == NULL)
                 RETURN(-ENOMEM);
@@ -1411,10 +1464,18 @@ static int lov_prep_async_page(struct obd_export *exp,
         /* so the callback doesn't need the lsm */
         lap->lap_loi_id = loi->loi_id;
 
-        rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
-                                 lsm, loi, page, lap->lap_sub_offset,
-                                 &lov_async_page_ops, lap,
-                                 &lap->lap_sub_cookie);
+        tgt = lov->tgts + loi->loi_ost_idx;
+
+        if (lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                rc = obd_prep_async_page(tgt->ltd_exp, lsm, loi, page,
+                                         lap->lap_sub_offset,
+                                         &lov_async_page_ops, lap,
+                                         &lap->lap_sub_cookie);
+                lov_tgt_decref(lov, tgt);
+        } else {
+                rc = -EIO;
+        }
+
         if (rc) {
                 OBD_FREE(lap, sizeof(*lap));
                 RETURN(rc);
@@ -1432,6 +1493,7 @@ static int lov_queue_async_io(struct obd_export *exp,
                               obd_flags brw_flags, obd_flags async_flags)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
+        struct lov_tgt_desc *tgt;
         struct lov_async_page *lap;
         int rc;
 
@@ -1441,12 +1503,16 @@ static int lov_queue_async_io(struct obd_export *exp,
                 RETURN(-EINVAL);
 
         lap = LAP_FROM_COOKIE(cookie);
-
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        tgt = lov->tgts + loi->loi_ost_idx;
 
-        rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
-                                loi, lap->lap_sub_cookie, cmd, off, count,
-                                brw_flags, async_flags);
+        if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen))
+                RETURN(-EIO);
+
+        rc = obd_queue_async_io(tgt->ltd_exp, lsm, loi, lap->lap_sub_cookie,
+                                cmd, off, count, brw_flags, async_flags);
+
+        lov_tgt_decref(lov, tgt);
         RETURN(rc);
 }
 
@@ -1456,6 +1522,7 @@ static int lov_set_async_flags(struct obd_export *exp,
                                obd_flags async_flags)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
+        struct lov_tgt_desc *tgt;
         struct lov_async_page *lap;
         int rc;
 
@@ -1465,11 +1532,16 @@ static int lov_set_async_flags(struct obd_export *exp,
                 RETURN(-EINVAL);
 
         lap = LAP_FROM_COOKIE(cookie);
-
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        tgt = lov->tgts + loi->loi_ost_idx;
 
-        rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
-                                 lsm, loi, lap->lap_sub_cookie, async_flags);
+        if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen))
+                RETURN(-EIO);
+
+        rc = obd_set_async_flags(tgt->ltd_exp, lsm, loi, lap->lap_sub_cookie,
+                                 async_flags);
+
+        lov_tgt_decref(lov, tgt);
         RETURN(rc);
 }
 
@@ -1481,6 +1553,7 @@ static int lov_queue_group_io(struct obd_export *exp,
                               obd_flags brw_flags, obd_flags async_flags)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
+        struct lov_tgt_desc *tgt;
         struct lov_async_page *lap;
         int rc;
 
@@ -1490,12 +1563,17 @@ static int lov_queue_group_io(struct obd_export *exp,
                 RETURN(-EINVAL);
 
         lap = LAP_FROM_COOKIE(cookie);
-
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        tgt = lov->tgts + loi->loi_ost_idx;
 
-        rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
-                                oig, lap->lap_sub_cookie, cmd, off, count,
+        if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen))
+                RETURN(-EIO);
+
+        rc = obd_queue_group_io(tgt->ltd_exp, lsm, loi, oig,
+                                lap->lap_sub_cookie, cmd, off, count,
                                 brw_flags, async_flags);
+
+        lov_tgt_decref(lov, tgt);
         RETURN(rc);
 }
 
@@ -1517,13 +1595,17 @@ static int lov_trigger_group_io(struct obd_export *exp,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
+
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
-                err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
-                                           lsm, loi, oig);
+                err = obd_trigger_group_io(tgt->ltd_exp, lsm, loi, oig);
+
+                lov_tgt_decref(lov, tgt);
+
                 if (rc == 0 && err != 0)
                         rc = err;
         };
@@ -1535,8 +1617,10 @@ static int lov_teardown_async_page(struct obd_export *exp,
                                    struct lov_oinfo *loi, void *cookie)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
+        struct lov_tgt_desc *tgt;
         struct lov_async_page *lap;
         int rc;
+        ENTRY;
 
         LASSERT(loi == NULL);
 
@@ -1546,9 +1630,15 @@ static int lov_teardown_async_page(struct obd_export *exp,
         lap = LAP_FROM_COOKIE(cookie);
 
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        tgt = lov->tgts + loi->loi_ost_idx;
+
+        if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen))
+                RETURN(-EIO);
+
+        rc = obd_teardown_async_page(tgt->ltd_exp, lsm, loi,
+                                     lap->lap_sub_cookie);
+        lov_tgt_decref(lov, tgt);
 
-        rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
-                                     lsm, loi, lap->lap_sub_cookie);
         if (rc) {
                 CERROR("unable to teardown sub cookie %p: %d\n",
                        lap->lap_sub_cookie, rc);
@@ -1673,12 +1763,19 @@ static int lov_change_cbdata(struct obd_export *exp,
 
         lov = &exp->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_stripe_md submd;
+
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        continue;
+                }
+
                 submd.lsm_object_id = loi->loi_id;
                 submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
-                rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
-                                       &submd, it, data);
+                rc = obd_change_cbdata(tgt->ltd_exp, &submd, it, data);
+                lov_tgt_decref(lov, tgt);
         }
         RETURN(rc);
 }
@@ -1734,14 +1831,27 @@ static int lov_cancel_unused(struct obd_export *exp,
 {
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        struct lov_tgt_desc *tgt;
         int rc = 0, i;
         ENTRY;
 
         lov = &exp->exp_obd->u.lov;
         if (lsm == NULL) {
-                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                        int err = obd_cancel_unused(lov->tgts[i].ltd_exp,
-                                                    NULL, flags, opaque);
+                tgt = lov->tgts;
+                for (i = 0; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                        int err;
+
+                        if (lov_tgt_ready(lov, tgt, 0)) {
+                                err = obd_cancel_unused(tgt->ltd_exp, NULL,
+                                                        flags, opaque);
+                                lov_tgt_decref(lov, tgt);
+                        } else {
+                                if (obd_uuid_empty(&tgt->uuid))
+                                        continue;
+                                CERROR("error: cancel unused "
+                                       "OST idx %d: OST inactive.\n", i);
+                                err = -EIO;
+                        }
                         if (!rc)
                                 rc = err;
                 }
@@ -1760,21 +1870,32 @@ static int lov_cancel_unused(struct obd_export *exp,
                 struct lov_stripe_md submd;
                 int err;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0)
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
-
                 submd.lsm_object_id = loi->loi_id;
                 submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
-                err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
-                                        &submd, flags, opaque);
-                if (err && lov->tgts[loi->loi_ost_idx].active) {
-                        CERROR("error: cancel unused objid "LPX64" subobj "LPX64
-                               " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
-                               loi->loi_id, loi->loi_ost_idx, err);
-                        if (!rc)
-                                rc = err;
+
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                        err = obd_cancel_unused(tgt->ltd_exp, &submd, flags,
+                                                opaque);
+                        lov_tgt_decref(lov, tgt);
+                        if (err && lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                                lov_tgt_decref(lov, tgt);
+                                CERROR("error: cancel unused objid "LPX64" "
+                                       "subobj "LPX64" OST idx %d: rc = %d\n",
+                                       lsm->lsm_object_id, loi->loi_id,
+                                       loi->loi_ost_idx, err);
+                        }
+                } else {
+                        err = -EIO;
+                        CERROR("error: cancel unused objid "LPX64" "
+                               "subobj "LPX64" on OST idx %d: OST inactive.\n",
+                               lsm->lsm_object_id,
+                               loi->loi_id, loi->loi_ost_idx);
                 }
+
+                if (!rc)
+                        rc = err;
         }
         RETURN(rc);
 }
@@ -1792,6 +1913,7 @@ static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                       unsigned long max_age)
 {
         struct lov_obd *lov = &obd->u.lov;
+        struct lov_tgt_desc *tgt = lov->tgts;
         struct obd_statfs lov_sfs;
         int set = 0;
         int rc = 0;
@@ -1800,18 +1922,22 @@ static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
 
 
         /* We only get block data from the OBD */
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+        for (i = 0; i < lov->desc.ld_tgt_count; i++, tgt++) {
                 int err;
-                if (!lov->tgts[i].active) {
+
+                if (!lov_tgt_active(lov, tgt, 0)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
                 }
 
-                err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
-                                 max_age);
+                err = obd_statfs(class_exp2obd(tgt->ltd_exp), &lov_sfs,max_age);
+                lov_tgt_decref(lov, tgt);
                 if (err) {
-                        if (lov->tgts[i].active && !rc)
-                                rc = err;
+                        if (lov_tgt_active(lov, tgt, 0)) {
+                                lov_tgt_decref(lov, tgt);
+                                if (!rc)
+                                        rc = err;
+                        }
                         continue;
                 }
 
@@ -1843,6 +1969,14 @@ static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                                          lov->desc.ld_default_stripe_count :
                                          lov->desc.ld_active_tgt_count;
 
+                if ((lov->desc.ld_default_stripe_count == 0) ||
+                    (lov->desc.ld_default_stripe_count >
+                     lov->desc.ld_active_tgt_count)) {
+                        expected_stripes = lov->desc.ld_active_tgt_count;
+                } else {
+                        expected_stripes = lov->desc.ld_default_stripe_count;
+                }
+
                 if (osfs->os_files != LOV_U64_MAX)
                         do_div(osfs->os_files, expected_stripes);
                 if (osfs->os_ffree != LOV_U64_MAX)
@@ -1858,6 +1992,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 {
         struct obd_device *obddev = class_exp2obd(exp);
         struct lov_obd *lov = &obddev->u.lov;
+        struct lov_tgt_desc *tgt;
         int i, rc, count = lov->desc.ld_tgt_count;
         struct obd_uuid *uuidp;
         ENTRY;
@@ -1865,7 +2000,6 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         switch (cmd) {
         case OBD_IOC_LOV_GET_CONFIG: {
                 struct obd_ioctl_data *data = karg;
-                struct lov_tgt_desc *tgtdesc;
                 struct lov_desc *desc;
                 char *buf = NULL;
                 __u32 *genp;
@@ -1897,12 +2031,14 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
                 genp = (__u32 *)data->ioc_inlbuf3;
-                tgtdesc = lov->tgts;
                 /* the uuid will be empty for deleted OSTs */
-                for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
-                        obd_str2uuid(uuidp, (char *)tgtdesc->uuid.uuid);
-                        *genp = tgtdesc->ltd_gen;
+                lov_tgts_lock(lov);
+                tgt = lov->tgts;
+                for (i = 0; i < count; i++, uuidp++, genp++, tgt++) {
+                        obd_str2uuid(uuidp, (char *)tgt->uuid.uuid);
+                        *genp = tgt->ltd_gen;
                 }
+                lov_tgts_unlock(lov);
 
                 rc = copy_to_user((void *)uarg, buf, len);
                 if (rc)
@@ -1924,17 +2060,23 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 if (count == 0)
                         RETURN(-ENOTTY);
                 rc = 0;
-                for (i = 0; i < count; i++) {
+                for (i = 0, tgt = lov->tgts; i < count; i++, tgt++) {
                         int err;
 
-                        /* OST was deleted */
-                        if (obd_uuid_empty(&lov->tgts[i].uuid))
+                        if (!lov_tgt_active(lov, tgt, 0)) {
+                                CERROR("error: iocontrol failed for OSC %s on "
+                                       "OST idx %d: OST inactive.\n",
+                                       tgt->uuid.uuid, i);
+                                if (!rc)
+                                        rc = -EIO;
                                 continue;
+                        }
 
-                        err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
-                                            len, karg, uarg);
+                        err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+                        lov_tgt_decref(lov, tgt);
                         if (err) {
-                                if (lov->tgts[i].active) {
+                                if (lov_tgt_ready(lov, tgt, 0)) {
+                                        lov_tgt_decref(lov, tgt);
                                         CERROR("error: iocontrol OSC %s on OST "
                                                "idx %d cmd %x: err = %d\n",
                                                lov->tgts[i].uuid.uuid, i,
@@ -1958,6 +2100,7 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen,
 {
         struct obd_device *obddev = class_exp2obd(exp);
         struct lov_obd *lov = &obddev->u.lov;
+        struct lov_tgt_desc *tgt;
         int i;
         ENTRY;
 
@@ -1985,6 +2128,7 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen,
                 /* XXX - it's assumed all the locks for deleted OSTs have
                  * been cancelled. Also, the export for deleted OSTs will
                  * be NULL and won't match the lock's export. */
+                lov_tgts_lock(lov);
                 for (i = 0, loi = data->lsm->lsm_oinfo;
                      i < data->lsm->lsm_stripe_count;
                      i++, loi++) {
@@ -1992,10 +2136,12 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen,
                                         data->lock->l_conn_export &&
                             loi->loi_id == res_id->name[0] &&
                             loi->loi_gr == res_id->name[2]) {
+                                lov_tgts_unlock(lov);
                                 *stripe = i;
                                 RETURN(0);
                         }
                 }
+                lov_tgts_unlock(lov);
                 LDLM_ERROR(data->lock, "lock on inode without such object");
                 dump_lsm(D_ERROR, data->lsm);
                 portals_debug_dumpstack(NULL);
@@ -2019,11 +2165,13 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen,
                 obd_id *ids = val;
                 int rc = 0;
 
-                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                        if (!lov->tgts[i].active)
+                tgt = lov->tgts;
+                for (i = 0; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                        if (!lov_tgt_active(lov, tgt, 0))
                                 continue;
-                        rc = obd_get_info(lov->tgts[i].ltd_exp,
-                                          keylen, key, &size, &(ids[i]));
+                        rc = obd_get_info(tgt->ltd_exp, keylen, key, &size,
+                                          ids + i);
+                        lov_tgt_decref(lov, tgt);
                         if (rc != 0)
                                 RETURN(rc);
                 }
@@ -2043,6 +2191,7 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
 {
         struct obd_device *obddev = class_exp2obd(exp);
         struct lov_obd *lov = &obddev->u.lov;
+        struct lov_tgt_desc *tgt;
         int i, rc = 0, err;
         ENTRY;
 
@@ -2051,14 +2200,15 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
 
         if (KEY_IS("async")) {
                 struct lov_desc *desc = &lov->desc;
-                struct lov_tgt_desc *tgts = lov->tgts;
 
                 if (vallen != sizeof(int))
                         RETURN(-EINVAL);
                 lov->async = *((int*) val);
 
-                for (i = 0; i < desc->ld_tgt_count; i++, tgts++) {
-                        struct obd_uuid *tgt_uuid = &tgts->uuid;
+                lov_tgts_lock(lov);
+                tgt = lov->tgts;
+                for (i = 0; i < desc->ld_tgt_count; i++, tgt++) {
+                        struct obd_uuid *tgt_uuid = &tgt->uuid;
                         struct obd_device *tgt_obd;
 
                         tgt_obd = class_find_client_obd(tgt_uuid,
@@ -2081,6 +2231,7 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                                         rc = err;
                         }
                 }
+                lov_tgts_unlock(lov);
                 RETURN(rc);
         }
 
@@ -2095,9 +2246,9 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 struct obd_export *exp;
                 int rc = 0, err, i;
 
-                spin_lock(&lov->lov_lock);
-                for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
-                     i++, tgt++) {
+                lov_tgts_lock(lov);
+                tgt = lov->tgts;
+                for (i = 0; i < lov->desc.ld_tgt_count; i++, tgt++) {
                         exp = tgt->ltd_exp;
                         /* during setup time the connections to osc might
                          * haven't been established.
@@ -2122,7 +2273,7 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                         if (!rc)
                                 rc = err;
                 }
-                spin_unlock(&lov->lov_lock);
+                lov_tgts_unlock(lov);
 
                 RETURN(rc);
         } else if (KEY_IS("auditlog")) {
@@ -2213,12 +2364,13 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 struct lov_tgt_desc *tgt;
                 int rc = 0, i;
 
-                for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
-                     i++, tgt++) {
-                        if (!tgt->ltd_exp)
+                tgt = lov->tgts;
+                for (i = 0; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                        if (!lov_tgt_active(lov, tgt, 0))
                                 continue;
                         rc = obd_set_info(tgt->ltd_exp,
                                           keylen, key, vallen, val);
+                        lov_tgt_decref(lov, tgt);
                         if (rc)
                                 RETURN(rc);
                 }
@@ -2243,17 +2395,24 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(-EINVAL);
         }
 
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
-                        continue;
+        tgt = lov->tgts;
+        for (i = 0; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                int active = 0;
 
-                if (!val && !lov->tgts[i].active)
+                if (val) {
+                        if (!obd_uuid_equals(val, &tgt->uuid))
+                                continue;
+                } else if (!lov_tgt_active(lov, tgt, 0)) {
                         continue;
+                } else {
+                        active = 1;
+                }
 
-                err = obd_set_info(lov->tgts[i].ltd_exp,
-                                  keylen, key, vallen, val);
+                err = obd_set_info(tgt->ltd_exp, keylen, key, vallen, val);
                 if (!rc)
                         rc = err;
+                if (active)
+                        lov_tgt_decref(lov, tgt);
         }
         RETURN(rc);
 #undef KEY_IS
index 23e871c..bdf037b 100644 (file)
@@ -300,6 +300,7 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count, int pattern)
         (*lsmp)->lsm_xfersize = PTLRPC_MAX_BRW_SIZE * stripe_count;
         (*lsmp)->lsm_pattern = pattern;
         (*lsmp)->lsm_oinfo[0].loi_ost_idx = ~0;
+        (*lsmp)->lsm_oinfo[0].loi_ost_gen = ~0;
 
         for (i = 0, loi = (*lsmp)->lsm_oinfo; i < stripe_count; i++, loi++)
                 loi_init(loi);
@@ -339,7 +340,7 @@ int lov_unpackmd_v0(struct lov_obd *lov, struct lov_stripe_md *lsm,
                         le64_to_cpu(lmm->lmm_objects[ost_offset].l_object_id);
                 /* loi->loi_gr = 0; implicit */
                 loi->loi_ost_idx = ost_offset;
-                /* loi->loi_ost_gen = 0; implicit */
+                loi->loi_ost_gen = 1;
                 loi++;
         }
 
@@ -503,6 +504,7 @@ int lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp,
                 RETURN(rc);
 
         (*lsmp)->lsm_oinfo[0].loi_ost_idx = lum.lmm_stripe_offset;
+        /* XXX - what about loi_ost_gen ? */
         (*lsmp)->lsm_stripe_size = lum.lmm_stripe_size;
         (*lsmp)->lsm_xfersize = lum.lmm_stripe_size * stripe_count;
 
@@ -519,10 +521,20 @@ int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp,
         obd_id last_id = 0;
 
         for (i = 0; i < lump->lmm_stripe_count; i++) {
+                struct lov_tgt_desc *tgt;
                 __u32 len = sizeof(last_id);
-                oexp = lov->tgts[lump->lmm_objects[i].l_ost_idx].ltd_exp;
+
+                tgt = lov->tgts + lump->lmm_objects[i].l_ost_idx;
+                if (!lov_tgt_active(lov, tgt, lump->lmm_objects[i].l_ost_gen)) {
+                        CERROR("Object on ost idx %d: osc inactive.\n",
+                               lump->lmm_objects[i].l_ost_idx);
+                        continue;
+                }
+
+                oexp = tgt->ltd_exp;
                 rc = obd_get_info(oexp, strlen("last_id"), "last_id",
                                   &len, &last_id);
+                lov_tgt_decref(lov, tgt);
                 if (rc)
                         RETURN(rc);
                 if (lump->lmm_objects[i].l_object_id > last_id) {
@@ -541,6 +553,8 @@ int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp,
         for (i = 0; i < lump->lmm_stripe_count; i++) {
                 (*lsmp)->lsm_oinfo[i].loi_ost_idx =
                         lump->lmm_objects[i].l_ost_idx;
+                (*lsmp)->lsm_oinfo[i].loi_ost_gen =
+                        lump->lmm_objects[i].l_ost_gen;
                 (*lsmp)->lsm_oinfo[i].loi_id = lump->lmm_objects[i].l_object_id;
                 (*lsmp)->lsm_oinfo[i].loi_gr = lump->lmm_objects[i].l_object_gr;
         }
index 64cfa27..c9377cb 100644 (file)
@@ -114,28 +114,39 @@ int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea)
                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
 
         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
+                struct lov_tgt_desc *tgt = lov->tgts + ost_idx;
                 struct lov_request *req;
                 
                 ++ost_start_idx;
-                if (lov->tgts[ost_idx].active == 0) {
+                if (!lov_tgt_active(lov, tgt, 0)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
                 
                 req->rq_buflen = sizeof(*req->rq_md);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
                 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req->rq_md, sizeof(*req->rq_md));
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
                 
                 req->rq_idx = ost_idx;
+                req->rq_gen = tgt->ltd_gen;
                 req->rq_stripe = i;
                 /* create data objects with "parent" OA */
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
index dcf5810..f99307c 100644 (file)
@@ -47,13 +47,21 @@ static void lov_init_set(struct lov_request_set *set)
 
 static void lov_finish_set(struct lov_request_set *set)
 {
+        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
         struct list_head *pos, *n;
         ENTRY;
 
         LASSERT(set);
         list_for_each_safe(pos, n, &set->set_list) {
-                struct lov_request *req = list_entry(pos, struct lov_request,
-                                                     rq_link);
+                struct lov_request *req;
+                struct lov_tgt_desc *tgt;
+
+                req = list_entry(pos, struct lov_request, rq_link);
+                LASSERT(req->rq_idx >= 0);
+
+                tgt = lov->tgts + req->rq_idx;
+                lov_tgt_decref(lov, tgt);
+
                 list_del_init(&req->rq_link);
 
                 if (req->rq_oa)
@@ -94,8 +102,14 @@ int lov_update_common_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !lov->tgts[req->rq_idx].active)
-                rc = 0;
+        if (rc) {
+                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
+
+                if (lov_tgt_active(lov, tgt, req->rq_gen))
+                        lov_tgt_decref(lov, tgt);
+                else
+                        rc = 0;
+        }
 
         /* FIXME in raid1 regime, should return 0 */
         RETURN(rc);
@@ -160,14 +174,20 @@ int lov_update_enqueue_set(struct lov_request_set *set,
         } else {
                 struct obd_export *exp = set->set_exp;
                 struct lov_obd *lov = &exp->exp_obd->u.lov;
+                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
 
                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
-                if (lov->tgts[req->rq_idx].active) {
+                if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                        lov_tgt_decref(lov, tgt);
                         CERROR("error: enqueue objid "LPX64" subobj "
                                 LPX64" on OST idx %d: rc = %d\n",
                                 set->set_md->lsm_object_id, loi->loi_id,
                                 loi->loi_ost_idx, rc);
                 } else {
+                        CERROR("error: enqueue objid "LPX64" subobj "
+                                LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
+                                set->set_md->lsm_object_id, loi->loi_id,
+                                loi->loi_ost_idx, rc);
                         rc = ELDLM_OK;
                 }
         }
@@ -181,6 +201,7 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
         struct lov_request *req;
         struct lustre_handle *lov_lockhp = NULL;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        struct lov_tgt_desc *tgt;
         int rc = 0;
         ENTRY;
 
@@ -206,12 +227,14 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
                 if (lov_lockhp->cookie == 0)
                         continue;
 
-                rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
-                                mode, lov_lockhp);
-                if (rc && lov->tgts[req->rq_idx].active)
+                tgt = lov->tgts + req->rq_idx;
+                rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
+                if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                        lov_tgt_decref(lov, tgt);
                         CERROR("cancelling obdjid "LPX64" on OST "
                                "idx %d error: rc = %d\n",
                                req->rq_md->lsm_object_id, req->rq_idx, rc);
+                }
         }
         lov_llh_put(set->set_lockh);
         RETURN(rc);
@@ -261,6 +284,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt;
                 struct lov_request *req;
                 obd_off start, end;
 
@@ -268,25 +292,32 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                                            policy->l_extent.end, &start, &end))
                         continue;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_buflen = sizeof(*req->rq_md) +
                         sizeof(struct lov_oinfo);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_extent.start = start;
                 req->rq_extent.end = end;
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
@@ -362,6 +393,7 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt;
                 struct lov_request *req;
                 obd_off start, end;
 
@@ -370,24 +402,31 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                         continue;
 
                 /* FIXME raid1 should grace this error */
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out_set, rc = -EIO);
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_buflen = sizeof(*req->rq_md);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_extent.start = start;
                 req->rq_extent.end = end;
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
@@ -427,6 +466,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                         __u32 mode, struct lustre_handle *lockh,
                         struct lov_request_set **reqset)
 {
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_request_set *set;
         int i, rc = 0;
         struct lov_oinfo *loi;
@@ -448,6 +488,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt;
                 struct lov_request *req;
                 struct lustre_handle *lov_lockhp;
 
@@ -458,16 +499,29 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
                         continue;
                 }
 
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                        CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
+                               loi->loi_ost_idx, loi->loi_id);
+                        continue;
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_buflen = sizeof(*req->rq_md);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
@@ -536,15 +590,24 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         EXIT;
 cleanup:
         list_for_each (pos, &set->set_list) {
-                struct obd_export *sub_exp;
+                struct lov_tgt_desc *tgt;
                 int err = 0;
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
 
-                sub_exp = lov->tgts[req->rq_idx].ltd_exp,
-                err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
+                tgt = lov->tgts + req->rq_idx;
+                if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                        CERROR("Failed to uncreate objid "LPX64" subobj "
+                               LPX64" on OST idx %d: osc inactive.\n",
+                               set->set_oa->o_id, req->rq_oa->o_id,
+                               req->rq_idx);
+                        continue;
+                }
+
+                err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
+                lov_tgt_decref(lov, tgt);
                 if (err)
                         CERROR("Failed to uncreate objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
@@ -592,12 +655,15 @@ int lov_update_create_set(struct lov_request_set *set,
         struct obd_trans_info *oti = set->set_oti;
         struct lov_stripe_md *lsm = set->set_md;
         struct lov_oinfo *loi;
+        struct lov_tgt_desc *tgt;
         ENTRY;
 
         req->rq_stripe = set->set_success;
         loi = &lsm->lsm_oinfo[req->rq_stripe];
+        tgt = lov->tgts + req->rq_idx;
 
-        if (rc && lov->tgts[req->rq_idx].active) {
+        if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
+                lov_tgt_decref(lov, tgt);
                 CERROR("error creating objid "LPX64" sub-object"
                        " on OST idx %d/%d: rc = %d\n",
                        set->set_oa->o_id, req->rq_idx,
@@ -617,8 +683,11 @@ int lov_update_create_set(struct lov_request_set *set,
         loi->loi_id = req->rq_oa->o_id;
         loi->loi_gr = req->rq_oa->o_gr;
         loi->loi_ost_idx = req->rq_idx;
-        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
-               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+        loi->loi_ost_gen = req->rq_gen;
+        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
+               "idx %d gen %d\n",
+               lsm->lsm_object_id, loi->loi_id, loi->loi_id,
+               req->rq_idx, req->rq_gen);
         loi_init(loi);
 
         if (set->set_cookies)
@@ -822,32 +891,44 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
         shift = 0;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
                 struct lov_request *req;
+                struct lov_tgt_desc *tgt;
 
                 if (info[i].count == 0)
                         continue;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                tgt = lov->tgts + loi->loi_ost_idx;
+                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out, rc = -EIO);
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
 
                 if (src_oa)
                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 req->rq_buflen = sizeof(*req->rq_md);
                 OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL)
+                if (req->rq_md == NULL) {
+                        obdo_free(req->rq_oa);
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
+                }
 
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING */
@@ -930,23 +1011,30 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 lov_set_add_req(req, set);
@@ -1002,23 +1090,31 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
 
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
 
@@ -1082,22 +1178,31 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
@@ -1128,8 +1233,15 @@ int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
         ENTRY;
 
         lov_update_set(set, req, rc);
-        if (rc && !lov->tgts[req->rq_idx].active)
-                rc = 0;
+        if (rc) {
+                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
+
+                if (lov_tgt_active(lov, tgt, req->rq_gen))
+                        lov_tgt_decref(lov, tgt);
+                else
+                        rc = 0;
+        }
+
         /* FIXME in raid1 regime, should return 0 */
         RETURN(rc);
 }
@@ -1176,26 +1288,35 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
                         continue;
-                }
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
+                }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 req->rq_oa->o_gr = loi->loi_gr;
@@ -1256,26 +1377,34 @@ int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
 
         loi = lsm->lsm_oinfo;
         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
                         continue;
-                }
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
+                }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL)
+                if (req == NULL) {
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
+
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
+                req->rq_gen = loi->loi_ost_gen;
 
                 req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL)
+                if (req->rq_oa == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
+                }
                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
                 req->rq_oa->o_id = loi->loi_id;
                 req->rq_extent.start = rs;
index ee4883d..1f711af 100644 (file)
@@ -146,9 +146,9 @@ static int lov_tgt_seq_show(struct seq_file *p, void *v)
         struct lov_tgt_desc *tgt = v;
         struct obd_device *dev = p->private;
         struct lov_obd *lov = &dev->u.lov;
-        int idx = tgt - &(lov->tgts[0]);
+        int idx = tgt - lov->tgts;
         return seq_printf(p, "%d: %s %sACTIVE\n", idx, tgt->uuid.uuid,
-                          tgt->active ? "" : "IN");
+                          (tgt->ltd_flags & LTD_ACTIVE) ? "" : "IN");
 }
 
 struct seq_operations lov_tgt_sops = {
index 4ae0f67..b96b59f 100644 (file)
@@ -208,6 +208,7 @@ static int echo_create_object(struct obd_device *obd, int on_target,
 
                         lsm->lsm_oinfo[i].loi_ost_idx =
                                 (idx + i) % ec->ec_nstripes;
+                        lsm->lsm_oinfo[i].loi_ost_gen = 1;
                 }
         } else {
                 OBD_ALLOC(eco, sizeof(*eco));
index b3e7da1..79b0a31 100644 (file)
@@ -3254,6 +3254,7 @@ static int osc_import_event(struct obd_device *obd,
 {
         struct client_obd *cli;
         int rc = 0;
+        ENTRY;
 
         LASSERT(imp->imp_obd == obd);
 
index ea68251..df51aa1 100644 (file)
@@ -18,7 +18,7 @@ MOUNT1=${MOUNT1:-$MOUNT}
 MOUNT2=${MOUNT2:-${MOUNT}2}
 DIR=${DIR:-$MOUNT}
 DIR2=${DIR2:-$MOUNT2}
-PTLDEBUG=${PTLDEBUG:-0x3f0400}
+PTLDEBUG=${PTLDEBUG:--1}
 SUBSYSTEM=${SUBSYSTEM:- 0xffb7e3ff}
 PDSH=${PDSH:-no_dsh}
 
index b4ff31c..9ea94a1 100755 (executable)
@@ -226,9 +226,9 @@ class DaemonHandler:
                    os.kill(pid, 15)
                else:
                    log("was unable to find pid of " + self.command)
-                #time.sleep(1) # let daemon die
             except OSError, e:
                 log("unable to kill", self.command, e)
+            time.sleep(5) # let daemon die
             if self.running():
                 log("unable to kill", self.command)
 
@@ -650,19 +650,18 @@ class LCTLInterface:
 
     # create an lov
     def lov_setup(self, name, uuid, desc_uuid, stripe_cnt,
-                  stripe_sz, stripe_off, pattern, devlist = None):
+                  stripe_sz, stripe_off, pattern):
         cmds = """
   attach lov %s %s
-  lov_setup %s %d %d %d %s %s
-  quit""" % (name, uuid, desc_uuid, stripe_cnt, stripe_sz, stripe_off,
-             pattern, devlist)
+  lov_setup %s %d %d %d %s
+  quit""" % (name, uuid, desc_uuid, stripe_cnt, stripe_sz, stripe_off, pattern)
         self.run(cmds)
 
-    # add an OBD to a LOV
-    def lov_add_obd(self, name, uuid, obd_uuid, index, gen):
+    # add an OSC to a LOV
+    def lov_add_osc(self, name, ost_uuid, index, gen):
         cmds = """
   lov_modify_tgts add %s %s %s %s
-  quit""" % (name, obd_uuid, index, gen)
+  quit""" % (name, ost_uuid, index, gen)
         self.run(cmds)
 
     # create an lmv
@@ -673,14 +672,14 @@ class LCTLInterface:
   quit""" % (name, uuid, desc_uuid, devlist)
         self.run(cmds)
 
-    # delete an OBD from a LOV
-    def lov_del_obd(self, name, uuid, obd_uuid, index, gen):
+    # delete an OSC from a LOV
+    def lov_del_osc(self, name, ost_uuid, index, gen):
         cmds = """
   lov_modify_tgts del %s %s %s %s
-  quit""" % (name, obd_uuid, index, gen)
+  quit""" % (name, ost_uuid, index, gen)
         self.run(cmds)
 
-    # deactivate an OBD
+    # deactivate an OSC
     def deactivate(self, name):
         cmds = """
   device $%s
@@ -1639,8 +1638,7 @@ class LOV(Module):
                   self.stripe_off, self.pattern, self.devlist,
                   self.mds_name)
         lctl.lov_setup(self.name, self.uuid, self.desc_uuid,  self.stripe_cnt,
-                       self.stripe_sz, self.stripe_off, self.pattern,
-                      string.join(self.obdlist))
+                       self.stripe_sz, self.stripe_off, self.pattern)
         for (osc, index, gen, active) in self.osclist:
             target_uuid = osc.target_uuid
             try:
@@ -1651,7 +1649,7 @@ class LOV(Module):
             except CommandError, e:
                 print "Error preparing OSC %s\n" % osc.uuid
                 raise e
-            lctl.lov_add_obd(self.name, self.uuid, target_uuid, index, gen)
+            lctl.lov_add_osc(self.name, target_uuid, index, gen)
 
     def cleanup(self):
         for (osc, index, gen, active) in self.osclist:
@@ -1678,7 +1676,7 @@ class LMV(Module):
         Module.__init__(self, 'LMV', db)
         if name_override != None:
             self.name = "lmv_%s" % name_override
-           
+
         self.devlist = self.db.get_lmv_tgts('lmv_tgt')
        if self.devlist == None:
            self.devlist = self.db.get_refs('mds')
@@ -1698,7 +1696,7 @@ class LMV(Module):
                 panic('mdc not found:', mds_uuid)
 
     def prepare(self):
-        if is_prepared(self.name):
+        if config.record and is_prepared(self.name):
             return
            
        self.info();
@@ -2039,7 +2037,7 @@ class CONFDEV(Module):
             client.prepare()
             lctl.mount_option(self.target.getName(), client.get_name(), "", "")
             lctl.end_record()
-
+            process_updates(self.db, self.name, self.target.getName(), client) 
             config.cleanup = 1
             lctl.clear_log(self.name, self.target.getName() + '-clean')
             lctl.record(self.name, self.target.getName() + '-clean')
@@ -2197,8 +2195,14 @@ class MDSDEV(Module):
        # add CONFDEV modules
        if self.confobd != None:
             self.confobd.add_module(manager)
-           
+
     def write_conf(self):
+        if config.write_conf:
+            if not self.active:
+                debug(self.uuid, "not active")
+            else:
+                self.confobd.write_conf()
+            return
         if is_prepared(self.name):
             return
         if not self.active:
@@ -2470,34 +2474,33 @@ class Client(Module):
         self.backup_targets = []
        self.module = module
         self.db = tgtdb
-
-        self.tgt_dev_uuid = get_active_target(tgtdb)
-        if not self.tgt_dev_uuid:
-            panic("No target device found for target(1):", self.target_name)
-
-        self._server = None
-        self._connected = 0
-
+        self.uuid = uuid
         self.module = module
         self.module_name = string.upper(module)
+        self.fs_name = fs_name
         if not self_name:
             self.name = '%s_%s_%s_%s' % (self.module_name, socket.gethostname(),
                                          self.target_name, fs_name)
         else:
             self.name = self_name
-        self.uuid = uuid
-        self.lookup_server(self.tgt_dev_uuid)
-        self.lookup_backup_targets()
-        self.fs_name = fs_name
         if not self.module_dir:
             self.module_dir = module
 
+        self.tgt_dev_uuid = get_active_target(tgtdb)
+        if not self.tgt_dev_uuid:
+            panic("No target device found for target(1):", self.target_name)
+
+        self._server = None
+        self._connected = 0
+        self.lookup_server(tgtdb, self.tgt_dev_uuid)
+        self.lookup_backup_targets()
+
     def add_module(self, manager):
         manager.add_lustre_module(self.module_dir, self.module)
 
-    def lookup_server(self, srv_uuid):
+    def lookup_server(self, db, srv_uuid):
         """ Lookup a server's network information """
-        self._server_nets = get_ost_net(self.db, srv_uuid)
+        self._server_nets = get_ost_net(db, srv_uuid)
         if len(self._server_nets) == 0:
             panic ("Unable to find a server for:", srv_uuid)
            
@@ -3017,7 +3020,7 @@ def get_ost_net(self, osd_uuid):
     node = self.lookup(node_uuid)
     if not node:
         panic("unable to find node for osd_uuid:", osd_uuid,
-              " node_ref:", node_uuid_)
+              " node_ref:", node_uuid)
     for net_uuid in node.get_networks():
         db = node.lookup(net_uuid)
         srv_list.append(Network(db))
@@ -3076,8 +3079,8 @@ def getServices(self):
 #
 # OSC is no longer in the xml, so we have to fake it.
 # this is getting ugly and begging for another refactoring
-def get_osc(ost_db, uuid, fs_name):
-    osc = OSC(ost_db, uuid, fs_name)
+def get_osc(db, ost_uuid, fs_name):
+    osc = OSC(db, ost_uuid, fs_name)
     return osc
 
 def get_mdc(db, fs_name, mds_uuid):
@@ -3254,47 +3257,73 @@ def for_each_profile(db, prof_list, operation):
         services = getServices(prof_db)
         operation(services)
 
-def magic_get_osc(db, rec, lov):
-    if lov:
-        lov_uuid = lov.get_uuid()
-        lov_name = lov.osc.fs_name
-    else:
-        lov_uuid = rec.getAttribute('lov_uuidref')
+def get_fs_name(db, rec, tag, uuid):
         # FIXME: better way to find the mountpoint?
         filesystems = db.root_node.getElementsByTagName('filesystem')
         fsuuid = None
         for fs in filesystems:
-            ref = fs.getElementsByTagName('obd_ref')
-            if ref[0].getAttribute('uuidref') == lov_uuid:
+            ref = fs.getElementsByTagName(tag)
+            if ref[0].getAttribute('uuidref') == uuid:
                 fsuuid = fs.getAttribute('uuid')
                 break
 
         if not fsuuid:
-            panic("malformed xml: lov uuid '" + lov_uuid + "' referenced in 'add' record is not used by any filesystems.")
+            panic("malformed xml: uuid '" + uuid + "' referenced in '" + \
+                  rec.nodeName + "' record is not used by any filesystems.")
 
         mtpts = db.root_node.getElementsByTagName('mountpoint')
-        lov_name = None
+        fs_name = None
         for fs in mtpts:
             ref = fs.getElementsByTagName('filesystem_ref')
             if ref[0].getAttribute('uuidref') == fsuuid:
-                lov_name = fs.getAttribute('name')
+                fs_name = fs.getAttribute('name')
                 break
 
-        if not lov_name:
-            panic("malformed xml: 'add' record references lov uuid '" + lov_uuid + "', which references filesystem uuid '" + fsuuid + "', which does not reference a mountpoint.")
+        if not fs_name:
+            panic("malformed xml: '" + rec.nodeName + \
+                  "' record references uuid '" + uuid + \
+                  "', which references filesystem uuid '" + fsuuid + \
+                  "', which does not reference a mountpoint.")
+
+        return fs_name
+
+def magic_get_osc(db, rec, lov):
+    if lov:
+        lov_uuid = lov.get_uuid()
+       fs_name = lov.osc.fs_name
+        lov_name = lov.osc.name
+    else:
+        lov_uuid = rec.getAttribute('lov_uuidref')
+        fs_name = get_fs_name(db, rec, 'obd_ref', lov_uuid)
+        lov_name = "lov_" + fs_name
 
     print "lov_uuid: " + lov_uuid + "; lov_name: " + lov_name
 
     ost_uuid = rec.getAttribute('ost_uuidref')
-    obd = db.lookup(ost_uuid)
 
-    if not obd:
-        panic("malformed xml: 'add' record references ost uuid '" + ost_uuid + "' which cannot be found.")
+    if rec.nodeName == 'lov_delete':
+        #
+        # Use the update as a subtree in case a new OST is created with the
+        # same name as the one that we deleted or other info about the OSS
+        # has changed since the delete.
+        # XXX - Not sure if this is the way this is supposed to be done.
+        #
+        info = rec.parentNode.getElementsByTagName('info')
+        if not info:
+            print "delete record missing info !"
+        tgtdb = Lustre.LustreDB_XML(info[0], info[0])
+    else:
+        tgtdb = db
 
-    osc = get_osc(obd, lov_uuid, lov_name)
+    obd = tgtdb.lookup(ost_uuid)
+    if not obd:
+        panic("malformed xml: '" + rec.nodeName + \
+              "' record references ost uuid '" + ost_uuid + \
+              "' which cannot be found.")
+    osc = get_osc(obd, lov_uuid, fs_name)
     if not osc:
         panic('osc not found:', obd_uuid)
-    return osc
+    return lov_name, lov_uuid, osc
 
 # write logs for update records.  sadly, logs of all types -- and updates in
 # particular -- are something of an afterthought.  lconf needs rewritten with
@@ -3304,33 +3333,28 @@ def process_update_record(db, update, lov):
         if rec.nodeType != rec.ELEMENT_NODE:
             continue
 
-        log("found "+rec.nodeName+" record in update version " +
+        if rec.nodeName == 'info':
+            continue
+
+        log("found " + rec.nodeName + " record in update version " +
             str(update.getAttribute('version')))
 
+        if rec.nodeName != 'lov_add' and rec.nodeName != 'lov_delete' and \
+           rec.nodeName != 'lov_deactivate':
+                panic("unrecognized update record type '" + rec.nodeName + "'.")
+
         lov_uuid = rec.getAttribute('lov_uuidref')
         ost_uuid = rec.getAttribute('ost_uuidref')
         index = rec.getAttribute('index')
         gen = rec.getAttribute('generation')
 
         if not lov_uuid or not ost_uuid or not index or not gen:
-            panic("malformed xml: 'update' record requires lov_uuid, ost_uuid, index, and generation.")
+            panic("malformed xml: '" + rec.nodeName + "' record requires lov_uuid, ost_uuid, index, and generation.")
 
-        if not lov:
-            tmplov = db.lookup(lov_uuid)
-            if not tmplov:
-                panic("malformed xml: 'delete' record contains lov UUID '" + lov_uuid + "', which cannot be located.")
-            lov_name = tmplov.getName()
-        else:
-            lov_name = lov.osc.name
+        lov_name, lov_uuid, osc = magic_get_osc(db, rec, lov)
 
         # ------------------------------------------------------------- add
-        if rec.nodeName == 'add':
-            if config.cleanup:
-                lctl.lov_del_obd(lov_name, lov_uuid, ost_uuid, index, gen)
-                continue
-
-            osc = magic_get_osc(db, rec, lov)
-
+        if rec.nodeName == 'lov_add':
             try:
                 # Only ignore connect failures with --force, which
                 # isn't implemented here yet.
@@ -3339,15 +3363,10 @@ def process_update_record(db, update, lov):
                 print "Error preparing OSC %s\n" % osc.uuid
                 raise e
 
-            lctl.lov_add_obd(lov_name, lov_uuid, ost_uuid, index, gen)
+            lctl.lov_add_osc(lov_name, ost_uuid, index, gen)
 
         # ------------------------------------------------------ deactivate
-        elif rec.nodeName == 'deactivate':
-            if config.cleanup:
-                continue
-
-            osc = magic_get_osc(db, rec, lov)
-
+        elif rec.nodeName == 'lov_deactivate':
             try:
                 osc.deactivate()
             except CommandError, e:
@@ -3355,11 +3374,8 @@ def process_update_record(db, update, lov):
                 raise e
 
         # ---------------------------------------------------------- delete
-        elif rec.nodeName == 'delete':
-            if config.cleanup:
-                continue
-
-            osc = magic_get_osc(db, rec, lov)
+        elif rec.nodeName == 'lov_delete':
+            lctl.lov_del_osc(lov_name, ost_uuid, index, gen)
 
             try:
                 config.cleanup = 1
@@ -3369,9 +3385,12 @@ def process_update_record(db, update, lov):
                 print "Error cleaning up OSC %s\n" % osc.uuid
                 raise e
 
-            lctl.lov_del_obd(lov_name, lov_uuid, ost_uuid, index, gen)
-
 def process_updates(db, log_device, log_name, lov = None):
+    if not config.write_conf and not config.record:
+        return
+    if config.cleanup:
+        return
+
     updates = db.root_node.getElementsByTagName('update')
     for u in updates:
         if not u.childNodes:
@@ -3389,13 +3408,12 @@ def process_updates(db, log_device, log_name, lov = None):
         lctl.end_record()
 
 def doWriteconf(services):
-    #if config.nosetup:
-    #    return
     for s in services:
         if s[1].get_class() == 'mdsdev' or s[1].get_class() == 'osd':
             n = newService(s[1])
             n.write_conf()
-            n.cleanup()
+            if not config.nosetup:
+               n.cleanup()
 
 def doSetup(services):
     if config.nosetup:
@@ -3428,7 +3446,7 @@ def doLoadModules(services):
 def doUnloadModules(services):
     if config.nomod:
         return
-        
+
     # adding all needed modules from all services
     for s in services:
         n = newService(s[1])
@@ -3949,12 +3967,11 @@ def main():
 
     doHost(lustreDB, node_list)
 
-    if not config.record:
-        return
-
-    lctl.end_record()
+    if config.record:
+        lctl.end_record()
+        process_updates(lustreDB, config.record_device, config.record_log)
 
-    process_updates(lustreDB, config.record_device, config.record_log)
+    return
 
 if __name__ == "__main__":
     try:
index 9f4893c..220276d 100755 (executable)
@@ -139,7 +139,6 @@ Object creation command summary:
   --filesystem filesystem name
  
 --delete ost
-  --node node_name
   --ost ost_name
   --migrate
   
@@ -469,8 +468,9 @@ class GenConfig:
         osd.setAttribute('osdtype', osdtype)
         osd.appendChild(self.ref("target", ost_uuid))
         osd.appendChild(self.ref("node", node_uuid))
-       osd.appendChild(self.dev(devname)) 
 
+        if devname:
+            osd.appendChild(self.dev(devname)) 
        if fstype:
             self.addElement(osd, "fstype", fstype)
         if backfstype:
@@ -662,19 +662,23 @@ class GenConfig:
         new.setAttribute("version", version)
         return new
 
-    def add(self, lov, ost, index, gen):
-        new = self.doc.createElement("add")
+    def info(self):
+        new = self.doc.createElement("info")
+        return new
+
+    def lov_add(self, lov, ost, index, gen):
+        new = self.doc.createElement("lov_add")
         new.setAttribute("lov_uuidref", lov)
         new.setAttribute("ost_uuidref", ost)
         new.setAttribute("index", index)
         new.setAttribute("generation", gen)
         return new
 
-    def delete(self, lov, ost, index, gen, options):
+    def lov_delete(self, lov, ost, index, gen, options):
         if options.delete:
-            new = self.doc.createElement("delete")
+            new = self.doc.createElement("lov_delete")
         else:
-            new = self.doc.createElement("deactivate")
+            new = self.doc.createElement("lov_deactivate")
         new.setAttribute("lov_uuidref", lov)
         new.setAttribute("ost_uuidref", ost)
         new.setAttribute("index", index)
@@ -711,19 +715,6 @@ def addUpdate(gen, lustre, node):
     update = findLastUpdate(lustre)
     if not update:
         return
-    #add_record = update.getElementsByTagName('add')
-    #if not add_record:
-    #    add_record = gen.add()
-    #    update.appendChild(add_record)
-    #else:
-    #    add_record = add_record[0]
-    #add_record.appendChild(node)
-    update.appendChild(node)
-
-def delUpdate(gen, lustre, node):
-    update = findLastUpdate(lustre)
-    if not update:
-        return
     update.appendChild(node)
 
 def findByName(lustre, name, tag = ""):
@@ -780,23 +771,22 @@ def get_net_uuid(lustre, node_name):
 
 def lov_mod_obd(gen, lustre, lov, tgt, osc_uuid, options):
     tgt.setAttribute('uuidref', osc_uuid)
-    if options.migrate:
-        gener = int(tgt.getAttribute('generation'))
-    else:
-        gener = int(tgt.getAttribute('generation')) + 1
+    gener = int(tgt.getAttribute('generation'))
+    if not options.migrate:
+        gener = str(int(gener) + 1)
     tgt.setAttribute('generation', str(gener))
     tgt.setAttribute('active', '1')
     lov_index = int(tgt.getAttribute('index'))
-    addUpdate(gen, lustre, gen.add(getUUID(lov), osc_uuid, str(lov_index),
-              str(gener)))
+    add_rec = gen.lov_add(getUUID(lov), osc_uuid, str(lov_index), str(gener))
+    addUpdate(gen, lustre, add_rec)
     return
 
-def lov_add_obd(gen, lustre, lov, osc_uuid, options):
+def lov_add_osc(gen, lustre, lov, osc_uuid, options):
     lov_name = getName(lov)
     lov_uuid = getUUID(lov)
     if options.index:
         lov_index = get_option_int(options, 'index')
-        for tgt in lustre.getElementsByTagName('lov_tgt'):
+        for tgt in lov.getElementsByTagName('lov_tgt'):
             if str(lov_index) == tgt.getAttribute('index'):
                 uuidref = tgt.getAttribute('uuidref')
                 if uuidref != '':
@@ -806,7 +796,7 @@ def lov_add_obd(gen, lustre, lov, osc_uuid, options):
                 return
     else:
          lov_index = 0
-         for tgt in lustre.getElementsByTagName('lov_tgt'):
+         for tgt in lov.getElementsByTagName('lov_tgt'):
              uuidref = tgt.getAttribute('uuidref')
              tmp = int(tgt.getAttribute('index'))
              own_lov_uuid = tgt.getAttribute('lov_uuid')
@@ -818,13 +808,14 @@ def lov_add_obd(gen, lustre, lov, osc_uuid, options):
             lov_index = lov_index + 1
 
     lov.appendChild(gen.lov_tgt(osc_uuid, lov_uuid, str(lov_index), '1'))
-    addUpdate(gen, lustre, gen.add(getUUID(lov), lov_uuid, str(lov_index), '1'))
+    addrec = gen.lov_add(lov_uuid, osc_uuid, str(lov_index), '1')
+    addUpdate(gen, lustre, addrec)
 
-def lov_del_obd(gen, lustre, lov, osc_uuid, options):
+def lov_del_osc(gen, lustre, lov, osc_uuid, options):
     lov_name = getName(lov)
     if options.index:
         lov_index = get_option_int(options, 'index')
-        for tgt in lustre.getElementsByTagName('lov_tgt'):
+        for tgt in lov.getElementsByTagName('lov_tgt'):
             index = tgt.getAttribute('index')
             if index == lov_index:
                 uuidref = tgt.getAttribute('uuidref')
@@ -834,37 +825,36 @@ def lov_del_obd(gen, lustre, lov, osc_uuid, options):
                 if options.delete:
                     tgt.setAttribute('uuidref', '')
 
-                # bump the generation just in case...
-                if options.migrate:
-                    gen = int(tgt.getAttribute('generation'))
-                else:
-                    gen = int(tgt.getAttribute('generation')) + 1
+                gener = tgt.getAttribute('generation')
+                if not options.migrate:
+                    # bump the generation just in case...
+                    gener = str(int(gener) + 1)
 
                 tgt.setAttribute('active', '0')
-                tgt.setAttribute('generation', str(gen))
-                return
+                tgt.setAttribute('generation', gener)
+                return None
         raise OptionError("%s --index %d not in use by %s." %
                           (lov_name, lov_index, osc_uuid))
 
-    for tgt in lustre.getElementsByTagName('lov_tgt'):
+    for tgt in lov.getElementsByTagName('lov_tgt'):
         uuidref = tgt.getAttribute('uuidref')
         if uuidref == osc_uuid:
-            genera = int(tgt.getAttribute('generation'))
-            delete_rec = gen.delete(getUUID(lov),
-                                    osc_uuid,tgt.getAttribute('index'),
-                                    str(genera), options)
-            delUpdate(gen, lustre, delete_rec)
+            index = tgt.getAttribute('index')
+            gener = tgt.getAttribute('generation')
+            delete_rec = gen.lov_delete(getUUID(lov), osc_uuid, index, gener,
+                                        options)
+            addUpdate(gen, lustre, delete_rec)
 
             if options.delete:
                 tgt.setAttribute('uuidref', '')
             if not options.migrate:
-                genera = genera + 1
+                gener = str(int(gener) + 1)
             tgt.setAttribute('active', '0')
-            tgt.setAttribute('generation', str(genera))
+            tgt.setAttribute('generation', gener)
 
 def lmv_add_obd(gen, lmv, mdc_uuid):
     lmv.appendChild(gen.lmv_tgt(mdc_uuid))
-                            
+
 def ref_exists(profile, uuid):
     elist = profile.childNodes
     for e in elist:
@@ -1190,7 +1180,7 @@ def add_ost(gen, lustre, options):
         lov = findByName(lustre, lovname, "lov")
         if not lov:
             error('add_ost:', '"'+lovname+'"', "lov element not found.")
-        lov_add_obd(gen, lustre, lov, ost_uuid, options)
+        lov_add_osc(gen, lustre, lov, ost_uuid, options)
 
     if options.failover:
         ost.setAttribute('failover', "1")
@@ -1225,28 +1215,62 @@ def del_ost(gen, lustre, options):
         lov = findByName(lustre, lovname, "lov")
         if not lov:
             error('del_ost:', '"'+lovname+'"', "lov element not found.")
-        lov_del_obd(gen, lustre, lov, ost_uuid, options)
+        lov_del_osc(gen, lustre, lov, ost_uuid, options)
         # if the user specified a speficic LOV don't delete the OST itself
         return
 
     # remove OSD references from all LOVs
-    for n in lustre.getElementsByTagName('lov'):
-        lov_del_obd(gen, lustre, n, ost_uuid, options)
-    if not options.migrate:
-        return
-    # delete the OSDs
-    for osd in lustre.getElementsByTagName('osd'):
-        if ref_exists(osd, ost_uuid):
-            osd_uuid = osd.getAttribute('uuid')
-            # delete all profile references to this OSD
-            for profile in lustre.getElementsByTagName('profile'):
-                for osd_ref in profile.getElementsByTagName('osd_ref'):
-                    if osd_uuid == osd_ref.getAttribute('uuidref'):
-                        profile.removeChild(osd_ref)
-            lustre.removeChild(osd)
-
-    # delete the OST
+    for lov in lustre.getElementsByTagName('lov'):
+        lov_del_osc(gen, lustre, lov, ost_uuid, options)
+
+    info = gen.info()
+
+    # move the OST description to the update record
     lustre.removeChild(ost)
+    info.appendChild(ost)
+
+    active_ref = ost.getElementsByTagName('active_ref')
+    if not active_ref:
+        error('ost has no osd ref:', ostname)
+
+    # move the OSD description to the update record
+    osd_uuid = active_ref[0].getAttribute('uuidref')
+    osd = lookup(lustre, osd_uuid)
+    lustre.removeChild(osd)
+    info.appendChild(osd)
+
+    # make a copy of the OSS description in the update record
+    # XXX - should check to make sure one doesn't already exist.
+    node_ref = osd.getElementsByTagName('node_ref')
+    if not node_ref:
+        error('osd has no node ref:', ostname)
+    node_uuid = node_ref[0].getAttribute('uuidref')
+    node = lookup(lustre, node_uuid)
+    if not node:
+        error('unable to locate node for node ref:', node_uuid)
+
+    node_rec = node.cloneNode(1)
+    info.appendChild(node_rec)
+
+    prof_ref = node.getElementsByTagName('profile_ref')
+    if not prof_ref:
+        error('node has no profile ref:', node)
+    profile_uuid = prof_ref[0].getAttribute('uuidref')
+
+    # make a copy of the OSS's profile in the update record
+    # XXX - should check to make sure one doesn't already exist.
+    profile = lookup(lustre, profile_uuid)
+    profile_rec = profile.cloneNode(1)
+    info.appendChild(profile_rec)
+
+    # delete all references to this OSD in the OSS's current profile
+    for osd_ref in profile.getElementsByTagName('osd_ref'):
+        if osd_uuid == osd_ref.getAttribute('uuidref'):
+            profile.removeChild(osd_ref)
+
+    # XXX - We should cleanup the node and profile elements if they
+    #       no longer serve a purpose.
+    addUpdate(gen, lustre, info)
 
 def add_cmobd(gen, lustre, options):
     node_name = get_option(options, 'node')
@@ -1614,7 +1638,7 @@ def add_mtpt(gen, lustre, options):
             if not ost_uuid:
                 error('add_mtpt:', '"'+ost_name+'"', "ost element not found.")
             lov = findByName(lustre, lov_name, "lov")
-            lov_add_obd(gen, lustre, lov, ost_uuid, options)
+            lov_add_osc(gen, lustre, lov, ost_uuid, options)
 
     if fs_name == '':
        fs_name = new_name("FS_fsname")
index dfd2ddb..7d1b9a8 100644 (file)
@@ -342,20 +342,20 @@ int jt_lcfg_lov_setup(int argc, char **argv)
         struct lov_desc desc;
         int rc;
         char *end;
-                                                                                                                                                                                                     
+
         /* argv: lov_setup <LOV uuid> <stripe count> <stripe size>
-         *                 <stripe offset> <pattern> [ <max tgt index> ]
+         *                 <stripe offset> <pattern>
          */
-        if (argc <= 6)
+        if (argc != 6)
                 return CMD_HELP;
-                                                                                                                                                                                                     
+
         if (strlen(argv[1]) > sizeof(desc.ld_uuid) - 1) {
                 fprintf(stderr,
                         "error: %s: LOV uuid '%s' longer than "LPSZ" chars\n",
                         jt_cmdname(argv[0]), argv[1], sizeof(desc.ld_uuid) - 1);
                 return -EINVAL;
         }
-                                                                                                                                                                                                     
+
         memset(&desc, 0, sizeof(desc));
         obd_str2uuid(&desc.ld_uuid, argv[1]);
         desc.ld_default_stripe_count = strtoul(argv[2], &end, 0);
@@ -364,7 +364,7 @@ int jt_lcfg_lov_setup(int argc, char **argv)
                         jt_cmdname(argv[0]), argv[2]);
                 return CMD_HELP;
         }
-                                                                                                                                                                                                     
+
         desc.ld_default_stripe_size = strtoull(argv[3], &end, 0);
         if (*end) {
                 fprintf(stderr, "error: %s: bad default stripe size '%s'\n",
@@ -383,31 +383,21 @@ int jt_lcfg_lov_setup(int argc, char **argv)
                         jt_cmdname(argv[0]), desc.ld_default_stripe_size);
                 return -EINVAL;
         }
+
         desc.ld_default_stripe_offset = strtoull(argv[4], &end, 0);
         if (*end) {
                 fprintf(stderr, "error: %s: bad default stripe offset '%s'\n",
                         jt_cmdname(argv[0]), argv[4]);
                 return CMD_HELP;
         }
+
         desc.ld_pattern = strtoul(argv[5], &end, 0);
         if (*end) {
                 fprintf(stderr, "error: %s: bad stripe pattern '%s'\n",
                         jt_cmdname(argv[0]), argv[5]);
                 return CMD_HELP;
         }
-                                                                                                                                                                                                     
-        if (argc > 7) {
-                desc.ld_tgt_count = argc - 6;
-                if (desc.ld_default_stripe_count > desc.ld_tgt_count) {
-                        fprintf(stderr,
-                                "error: %s: default stripe count %u > "
-                                "OST count %u\n", jt_cmdname(argv[0]),
-                                desc.ld_default_stripe_count,
-                                desc.ld_tgt_count);
-                        return -EINVAL;
-                }
-        }
-        
+
         lustre_cfg_bufs_reset(&bufs, lcfg_devname);
         lustre_cfg_bufs_set(&bufs, 1, &desc, sizeof(desc));